"""Base class for directed graphs.
The Graph class allows any string as a node
Self-loops are allowed but multiple edges of a same edge type are not.
"""
import math
import mmap
import os
from struct import error, pack, unpack
from cachetools import LRUCache
from .exception import (EdgeNotFound, KeyTooLong, KinbakuError,
KinbakuException, NodeNotFound)
from .structure import Edge, Header, Node, text
from .utils import compare_edges, compare_nodes, to_string
[docs]class Graph:
[docs] def __init__(
self,
filename,
hash_func=None,
max_str_len=15,
max_key_len=15,
int_format="l",
char_format="h",
bool_format="?",
hash_format="I",
cache_len=1000000,
table_increment=100000,
preload=False,
node_class=None,
edge_class=None,
flag="w",
):
"""Initialize a directed graph. A file is automatically created
if the path provided by the filename argument does not exist.
Args:
filename (str): path to database. File created if it does not exist
hash_func (function, optional): hashing function. None means that
Google's CityHash will be used.
Defaults to None.
max_str_len (int, optional): max length of a string field.
Defaults to 15.
max_key_len (int, optional): max length of node keys.
Defaults to 15.
int_format (str, optional): format for integers as described in
struct package. Defaults to "l".
char_format (str, optional): format for characters.
Defaults to "h".
bool_format (str, optional): format for booleans.
Defaults to "?".
hash_format (str, optional): format for containing hashes.
Defaults to "I".
cache_len (int, optional): maximum size of the dictionary.
Defaults to 10000000.
table_increment (int, optional): [description]. Defaults to 100000.
preload (bool, optional): if True, all nodes attributes are loaded
on initialization. Defaults to False.
node_class (dataclass, optional): the dataclass that defines
custom node attributes.
Defaults to None.
edge_class (dataclass, optional): the dataclass that defines
custom edge attributes.
Defaults to None.
flag (str, optional): "w" for reading and writing,
"r" for reading only,
"n" for new (overwrite).
Defaults to "w".
Examples
--------
>>> G = kn.Graph("test.db")
>>> G = kn.Graph("test.db", flag="r")
>>> G = kn.Graph("test.db", hash_func=lambda x: math.abs(hash(x)))
"""
self.filename = filename
self.table_increment = table_increment
self.flag = flag
if hash_func is None:
try:
from cityhash import CityHash32
self.hash_func = CityHash32
except ImportError:
import mmh3
self.hash_func = lambda x: mmh3.hash(x, signed=False)
else:
self.hash_func = hash_func
# sizes and formats
self.max_str_len = max_str_len
self.max_key_len = max_key_len
self.int_format = int_format
self.char_format = char_format
self.bool_format = bool_format
self.hash_format = hash_format
# initialize cache
self.preload = preload
self.cache_len = cache_len
self.cache_id_to_key = LRUCache(cache_len)
self.cache_key_to_pos = LRUCache(cache_len)
self.cache_pos_to_node = LRUCache(cache_len)
self.cache_pos_to_node_tree = LRUCache(cache_len)
self.edge_tombstone = []
self.node_tombstone = []
# custom dataclasses
if node_class is not None:
self.node_class = node_class
else:
self.node_class = Node
if edge_class is not None:
self.edge_class = edge_class
else:
self.edge_class = Edge
# initialize sizes
self._init_edge_size()
self._init_node_size()
self._init_header_size()
# mmap file
self._load_file()
# =========================================================================
# Properties
# =========================================================================
@property
def n_nodes(self):
"""Number of nodes in graph
Returns:
int: number of nodes
"""
return self.header.n_nodes
@property
def n_edges(self):
"""Number of edges in graph
Returns:
int: number of edges
"""
return self.header.n_edges - self.header.n_nodes
@property
def nodes(self):
"""Iterate over all nodes
Yields:
iterator: an iterator over all nodes
"""
position = 0
leaf = self._get_node_at(position)
for node in self._node_dfs(leaf):
yield node.key
@property
def edges(self):
"""Iterate over all edges
Yields:
iterator: an iterator over all edges
"""
for edge in self._iter_edges():
yield self._get_keys_from_edge(edge)
# =========================================================================
# Parsers
# =========================================================================
def _parse_fields(self, data):
DATA_FORMAT = ""
DATA_VALUES = []
for field in data.__dataclass_fields__.values():
if field.name == "hash":
DATA_FORMAT += self.hash_format
DATA_VALUES.append(0)
elif field.type == text:
DATA_FORMAT += field.default.length * self.char_format
DATA_VALUES += (0,) * field.default.length
elif field.type == int:
DATA_FORMAT += self.int_format
DATA_VALUES.append(0)
elif field.name == "key":
DATA_FORMAT += self.max_key_len * self.char_format
DATA_VALUES += (0,) * self.max_key_len
elif field.type == str:
DATA_FORMAT += self.max_str_len * self.char_format
DATA_VALUES += (0,) * self.max_str_len
elif field.type == bool:
DATA_FORMAT += self.bool_format
DATA_VALUES.append(False)
elif field.type == float:
DATA_FORMAT += "f"
DATA_VALUES.append(0.0)
return DATA_FORMAT, DATA_VALUES
def _parse_values(self, data):
values = []
extend = values.extend
append = values.append
for field in data.__dataclass_fields__.keys():
value = getattr(data, field)
if isinstance(value, (int, bool)):
append(value)
elif isinstance(value, str):
if field != "key":
extend(self._str_to_list(value))
else:
extend(self._key_to_list(value))
elif isinstance(value, tuple):
extend(list(value))
else:
append(value)
return values
def _parse_attributes(self, leaf, attr):
if attr is None:
return
for attribute, value in attr.items():
if isinstance(value, str):
assert len(value) <= self.max_str_len
setattr(leaf, attribute, value)
# =========================================================================
# Initializers
# =========================================================================
def _init_edge_size(self):
self.EDGE_FORMAT, VALUES = self._parse_fields(self.edge_class)
self.EDGE = pack(self.EDGE_FORMAT, *VALUES)
self.EDGE_SIZE = len(self.EDGE)
def _init_node_size(self):
self.NODE_FORMAT, VALUES = self._parse_fields(self.node_class)
VALUES[0] = 1 # boolean that indicates that item is node
self.NODE = pack(self.NODE_FORMAT, *VALUES)
self.NODE_SIZE = len(self.NODE)
self.NODE_TO_EDGE_RATIO = math.ceil(self.NODE_SIZE / self.EDGE_SIZE)
self.NODE_PLACEHOLDER_SIZE = self.NODE_TO_EDGE_RATIO * self.EDGE_SIZE
# pad placeholder with 0s
self.NODE_PLACEHOLDER = self.NODE + b"\x00" * (
self.NODE_PLACEHOLDER_SIZE - self.NODE_SIZE)
# get node_tree_info format
self.NODE_TREE_FORMAT = (
2 * self.bool_format + self.hash_format + 2 * self.int_format)
self.NODE_TREE = pack(self.NODE_TREE_FORMAT, 0, 0, 0, 0, 0)
self.NODE_TREE_SIZE = len(self.NODE_TREE)
def _init_header_size(self):
HEADER_FORMAT = ""
HEADER_VALUES = []
for name, field in Header.__dataclass_fields__.items():
if field.type == int:
HEADER_FORMAT += self.int_format
if name == "table_size":
HEADER_VALUES.append(
self.table_increment + self.NODE_TO_EDGE_RATIO
)
elif name == "node_id":
HEADER_VALUES.append(1)
elif name == "next_table_position":
HEADER_VALUES.append(self.NODE_TO_EDGE_RATIO)
else:
HEADER_VALUES.append(0)
self.HEADER_FORMAT = HEADER_FORMAT
self.HEADER = pack(HEADER_FORMAT, *HEADER_VALUES)
self.HEADER_SIZE = len(self.HEADER)
# =========================================================================
# File & memory management
# =========================================================================
def _load_file(self):
# create folder if it doesn't exist
if self.flag in {"n", "w"}:
folder = os.path.dirname(self.filename)
if folder != "" and not os.path.exists(folder):
os.makedirs(folder)
# initialize new file if it doesn't exist or flag is set to NEW
if not os.path.exists(self.filename) or self.flag == "n":
with open(self.filename, "wb") as f:
f.write(self.HEADER)
f.write(self.NODE_PLACEHOLDER)
f.write(self.EDGE * self.table_increment)
self._map_to_memory()
self._get_sizes()
# insert immovable root node
root = self.node_class(hash=2147483648)
self._set_node_at(root, 0)
else:
self._map_to_memory()
self._get_sizes()
if self.preload:
for _ in self.nodes:
pass
def _map_to_memory(self):
with open(self.filename, "r+b") as f:
if self.flag == "w" or self.flag == "n":
self.mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE)
else:
self.mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
def _expand(self):
if (
self.header.next_table_position
<= self.header.table_size - 0.1 * self.table_increment
):
self.mm[: self.HEADER_SIZE] = pack(
self.HEADER_FORMAT, *self._parse_values(self.header))
return
with open(self.filename, "ab") as f:
f.write(self.EDGE * self.table_increment)
# add increment to table_size
self.header.table_size += self.table_increment
self.mm[:self.HEADER_SIZE] = pack(
self.HEADER_FORMAT, *self._parse_values(self.header))
self._map_to_memory()
def _increment_node(self, recycled):
self.header.n_nodes += 1
self.header.node_id += 1
if not recycled:
self.header.next_table_position += self.NODE_TO_EDGE_RATIO
self._expand()
def _increment_edge(self, recycled):
self.header.n_edges += 1
if not recycled:
self.header.next_table_position += 1
self._expand()
def _decrement_edge(self):
self.header.n_edges -= 1
self._expand()
def _decrement_node(self):
self.header.n_nodes -= 1
self._expand()
def _cache_node(self, node):
nkey, npos, nindex, nhash, nleft, nright = (
node.key, node.position, node.index,
node.hash, node.left, node.right)
self.cache_key_to_pos[nkey] = npos
self.cache_id_to_key[nindex] = nkey
self.cache_pos_to_node[npos] = node
self.cache_pos_to_node_tree[npos] = (nhash, nleft, nright)
def _uncache_node(self, node):
npos = node.position
try:
del self.cache_key_to_pos[node.key]
except KeyError:
pass
try:
del self.cache_id_to_key[node.index]
except KeyError:
pass
try:
del self.cache_pos_to_node[npos]
except KeyError:
pass
try:
del self.cache_pos_to_node_tree[npos]
except KeyError:
pass
def empty_cache(self):
self.cache_id_to_key = LRUCache(self.cache_len)
self.cache_key_to_pos = LRUCache(self.cache_len)
self.cache_pos_to_node = LRUCache(self.cache_len)
self.cache_pos_to_node_tree = LRUCache(self.cache_len)
self._get_sizes()
def find_tombstones(self):
EDGE_SIZE = self.EDGE_SIZE
HEADER_SIZE = self.HEADER_SIZE
NODE_TO_EDGE_RATIO = self.NODE_TO_EDGE_RATIO
position = 0
size = len(pack("?" + self.int_format, 0, 0))
while position <= self.header.next_table_position:
ind = position * EDGE_SIZE + HEADER_SIZE
is_node, index = unpack("?" + self.int_format,
self.mm[ind: ind + size])
if is_node:
if index == 0:
self.node_tombstone.append(position)
position += NODE_TO_EDGE_RATIO
else:
if index == 0:
self.edge_tombstone.append(position)
position += 1
# =========================================================================
# Tree traversal
# =========================================================================
def _iter_edges(self):
EDGE_SIZE = self.EDGE_SIZE
HEADER_SIZE = self.HEADER_SIZE
NODE_TO_EDGE_RATIO = self.NODE_TO_EDGE_RATIO
position = 0
while position <= self.header.next_table_position:
ind = position * EDGE_SIZE + HEADER_SIZE
is_node, exists = unpack("??", self.mm[ind: ind + 2])
if is_node:
position += NODE_TO_EDGE_RATIO
else:
if not exists:
position += 1
continue
edge = self._get_edge_at(position)
position += 1
if edge.is_edge_start:
continue
yield edge
def _find_node_pos(self, position, new_node):
_get_node_at = self._get_node_at
_get_node_tree_info_at = self._get_node_tree_info_at
current_hash, current_left, current_right = (
_get_node_tree_info_at(position))
new_node_hash = new_node.hash
while 1:
if new_node_hash < current_hash:
state = -1
elif new_node_hash > current_hash:
state = 1
else:
current_node = _get_node_at(position)
state = compare_nodes(current_node.hash,
current_node.key, new_node)
if state == -1:
if current_left != 0:
position = current_left
current_hash, current_left, current_right = (
_get_node_tree_info_at(position))
continue
break
elif state == 1:
if current_right != 0:
position = current_right
current_hash, current_left, current_right = (
_get_node_tree_info_at(position))
continue
break
else: # is equal
break
current_node = _get_node_at(position)
return current_node, state
def _find_edge_out_pos(self, position, new_edge):
_get_edge_at = self._get_edge_at
current_edge = _get_edge_at(position)
while 1:
state = compare_edges(current_edge, new_edge)
if state == -1: # go left
if current_edge.out_edge_left != 0:
position = current_edge.out_edge_left
current_edge = _get_edge_at(position)
continue
break
elif state == 1: # go right
if current_edge.out_edge_right != 0:
position = current_edge.out_edge_right
current_edge = _get_edge_at(position)
continue
break
else: # is equal
break
return current_edge, state
def _find_edge_in_pos(self, position, new_edge):
_get_edge_at = self._get_edge_at
current_edge = _get_edge_at(position)
while 1:
state = compare_edges(current_edge, new_edge)
if state == -1: # go left
if current_edge.in_edge_left != 0:
position = current_edge.in_edge_left
current_edge = _get_edge_at(position)
continue
break
elif state == 1: # go right
if current_edge.in_edge_right != 0:
position = current_edge.in_edge_right
current_edge = _get_edge_at(position)
continue
break
else: # is equal
break
return current_edge, state
def _find_inorder_successor_edge(self, edge, out=True):
_get_edge_at = self._get_edge_at
edge_right = edge.out_edge_right if out else edge.in_edge_right
left = "out_edge_left" if out else "in_edge_left"
successor = _get_edge_at(edge_right)
antecedent = edge
while getattr(successor, left) != 0:
antecedent = successor
successor = _get_edge_at(getattr(successor, left))
return (successor, antecedent)
def _find_inorder_successor_node(self, node):
_get_node_at = self._get_node_at
successor = _get_node_at(node.right)
antecedent = node
while successor.left != 0:
antecedent = successor
successor = _get_node_at(successor.left)
return (successor, antecedent)
def _node_dfs(self, node):
if node.index != 0:
yield node
# store in cache
self._cache_node(node)
if node.left != 0:
yield from self._node_dfs(self._get_node_at(node.left))
if node.right != 0:
yield from self._node_dfs(self._get_node_at(node.right))
def _edge_out_dfs(self, edge):
if edge.out_edge_left != 0:
yield from self._edge_out_dfs(
self._get_edge_at(edge.out_edge_left))
if edge.out_edge_right != 0:
yield from self._edge_out_dfs(
self._get_edge_at(edge.out_edge_right))
if not edge.is_edge_start:
yield edge
def _edge_in_dfs(self, edge):
if edge.in_edge_left != 0:
yield from self._edge_in_dfs(self._get_edge_at(edge.in_edge_left))
if edge.in_edge_right != 0:
yield from self._edge_in_dfs(self._get_edge_at(edge.in_edge_right))
if not edge.is_edge_start:
yield edge
def _unplug_edge(self, parent, state, out=True):
if state == -1: # edge came from left
if out:
parent.out_edge_left = 0
else:
parent.in_edge_left = 0
else:
if out:
parent.out_edge_right = 0
else:
parent.in_edge_right = 0
self._set_edge_at(parent, parent.position)
def _unplug_node(self, parent, state):
if state == -1:
parent.left = 0
else:
parent.right = 0
self._set_node_at(parent, parent.position)
def _rewire_edge(self, parent, child, state, out=True):
if state == -1:
if out:
parent.out_edge_left = child.position
else:
parent.in_edge_left = child.position
else:
if out:
parent.out_edge_right = child.position
else:
parent.in_edge_right = child.position
if out:
child.out_edge_parent = parent.position
else:
child.in_edge_parent = parent.position
self._set_edge_at(parent, parent.position)
self._set_edge_at(child, child.position)
def _rewire_node(self, parent, child, state):
if state == -1:
parent.left = child.position
else:
parent.right = child.position
child.parent = parent.position
self._set_node_at(parent, parent.position)
self._set_node_at(child, child.position)
def _remove_node_from_tree(self, node):
parent = self._get_node_at(node.parent)
# find state
if parent.left == node.position:
state = -1
elif parent.right == node.position:
state = 1
else:
raise KinbakuError("state == 0")
node_left = node.left
node_right = node.right
# case 1: node to remove has no child
if node_left == 0 and node_right == 0:
self._unplug_node(parent, state)
# case 2: node to remove has only one child
elif node_left == 0:
child = self._get_node_at(node_right)
self._rewire_node(parent, child, state)
elif node_right == 0:
child = self._get_node_at(node_left)
self._rewire_node(parent, child, state)
# case 3: node to remove has two children
else:
successor, antecedent = self._find_inorder_successor_node(node)
# remove antecedent link to successor
antecedent.left = 0
# set successor's parent to parent
successor.parent = parent.position
if state == -1:
parent.left = successor.position
else:
parent.right = successor.position
# case a: antecedent happens to be the node to remove
if antecedent.position == node.position:
successor.left = node_left
node_left_item = self._get_node_at(node_left)
node_left_item.parent = successor.position
self._set_node_at(node_left_item, node_left)
self._set_node_at(successor, successor.position)
self._set_node_at(parent, parent.position)
# case b: antecedent is further down
else:
# put left tree in left child of successor
successor.left = node_left
node_left_item = self._get_node_at(node_left)
node_left_item.parent = successor.position
self._set_node_at(node_left_item, node_left)
if node_right == antecedent.position:
antecedent.parent = successor.position
else:
node_right_item = self._get_node_at(node_right)
node_right_item.parent = successor.position
self._set_node_at(node_right_item, node_right)
# put right tree of successor to antecdent left tree
successor_right_pos = successor.right
antecedent.left = successor_right_pos
self._set_node_at(antecedent, antecedent.position)
if successor_right_pos != 0:
successor_right = self._get_node_at(successor_right_pos)
successor_right.parent = antecedent.position
self._set_node_at(successor_right, successor_right_pos)
successor.right = node_right
self._set_node_at(successor, successor.position)
self._set_node_at(parent, parent.position)
def _remove_edge_from_tree(self, edge, out=True):
# utilitary variables
if out:
edge_left = edge.out_edge_left
edge_right = edge.out_edge_right
parent = self._get_edge_at(edge.out_edge_parent)
else:
edge_left = edge.in_edge_left
edge_right = edge.in_edge_right
parent = self._get_edge_at(edge.in_edge_parent)
state = compare_edges(parent, edge)
# case 1: edge to remove has no child
if edge_left == 0 and edge_right == 0:
if parent.position < 0:
print(edge)
print(parent)
raise ValueError
self._unplug_edge(parent, state, out)
# case 2: edge to remove has only one child
elif edge_left == 0:
child = self._get_edge_at(edge_right)
self._rewire_edge(parent, child, state, out)
elif edge_right == 0:
child = self._get_edge_at(edge_left)
self._rewire_edge(parent, child, state, out)
# case 3: edge to remove has two children
else:
successor, antecedent = self._find_inorder_successor_edge(
edge, out=out)
right = "out_edge_right" if out else "in_edge_right"
left = "out_edge_left" if out else "in_edge_left"
up = "out_edge_parent" if out else "in_edge_parent"
# remove antecedent link to successor
setattr(antecedent, left, 0)
# set successor's parent to parent
setattr(successor, up, parent.position)
if state == -1:
setattr(parent, left, successor.position)
else:
setattr(parent, right, successor.position)
# case a: antecedent happens to be the edge to remove
if antecedent.position == edge.position:
setattr(successor, left, edge_left)
edge_left_item = self._get_edge_at(edge_left)
setattr(edge_left_item, up, successor.position)
self._set_edge_at(edge_left_item, edge_left)
self._set_edge_at(successor, successor.position)
self._set_edge_at(parent, parent.position)
# case b: antecedent is further down
else:
# put left tree in left child of successor
setattr(successor, left, edge_left)
edge_left_item = self._get_edge_at(edge_left)
setattr(edge_left_item, up, successor.position)
self._set_edge_at(edge_left_item, edge_left)
if edge_right == antecedent.position:
setattr(antecedent, up, successor.position)
else:
edge_right_item = self._get_edge_at(edge_right)
setattr(edge_right_item, up, successor.position)
self._set_edge_at(edge_right_item, edge_right)
# put right tree of successor to antecdent left tree
successor_right_pos = getattr(successor, right)
setattr(antecedent, left, successor_right_pos)
self._set_edge_at(antecedent, antecedent.position)
if successor_right_pos != 0:
successor_right = self._get_edge_at(successor_right_pos)
setattr(successor_right, up, antecedent.position)
self._set_edge_at(successor_right, successor_right_pos)
setattr(successor, right, edge_right)
self._set_edge_at(successor, successor.position)
self._set_edge_at(parent, parent.position)
# =========================================================================
# Getters
# =========================================================================
def _get_next_node_position(self):
if len(self.node_tombstone) == 0:
return self.header.next_table_position, False
return self.node_tombstone.pop(0), True
def _get_next_edge_position(self):
if len(self.edge_tombstone) == 0:
recycled = False
return self.header.next_table_position, recycled
recycled = True
return self.edge_tombstone.pop(0), recycled
def _get_sizes(self):
data = unpack(self.HEADER_FORMAT, self.mm[:self.HEADER_SIZE])
self.header = Header(*data)
def _get_node_tree_info_at(self, position):
data = self.cache_pos_to_node_tree.get(position)
if data is not None:
return data
ind = position * self.EDGE_SIZE + self.HEADER_SIZE
_, _, hash, left, right = unpack(
self.NODE_TREE_FORMAT, self.mm[ind: ind + self.NODE_TREE_SIZE])
self.cache_pos_to_node_tree[position] = hash, left, right
return hash, left, right
def _get_node_at(self, position):
node = self.cache_pos_to_node.get(position)
if node is not None:
return node
ind = position * self.EDGE_SIZE + self.HEADER_SIZE
try:
data = unpack(self.NODE_FORMAT, self.mm[ind: ind + self.NODE_SIZE])
except error:
self._map_to_memory()
data = unpack(self.NODE_FORMAT, self.mm[ind: ind + self.NODE_SIZE])
i = 0
res = []
append = res.append
for field in self.node_class.__dataclass_fields__.values():
if field.type != tuple and field.type != str:
append(data[i])
i += 1
elif field.name == "key":
append(to_string(data[i: i + self.max_key_len]))
i += self.max_key_len
elif field.type == str:
append(to_string(data[i: i + self.max_str_len]))
i += self.max_str_len
node = self.node_class(*res)
if node.index not in self.cache_id_to_key:
self._cache_node(node)
return node
def _get_edge_at(self, position):
ind = position * self.EDGE_SIZE + self.HEADER_SIZE
data = unpack(self.EDGE_FORMAT, self.mm[ind: ind + self.EDGE_SIZE])
edge = self.edge_class(*data)
return edge
def _get_edge_hash(self, source, target, edge_type):
return self.hash_func(
"_".join((str(source.hash), str(edge_type), str(target.hash))))
def _get_keys_from_edge(self, edge):
src_pos = edge.source_position
u = self._get_node_at(src_pos).key
tgt_pos = edge.target_position
v = self._get_node_at(tgt_pos).key
return u, v
# =========================================================================
# Public methods
# =========================================================================
[docs] def close(self):
"""Closes database file."""
self.mm.close()
[docs] def neighbors(self, u):
"""Iterate over all nodes v such that (u, v) is an edge
Args:
u (str): key of the source node
Yields:
iterator: iterator of node keys
"""
leaf = self.node(u)
start = self._get_edge_at(leaf.edge_start)
for edge in self._edge_out_dfs(start):
res = self._get_node_at(edge.target_position)
yield res.key
[docs] def predecessors(self, v):
"""Iterate over all nodes u such that (u, v) is an edge
Args:
v ([type]): key of the target node
Yields:
iterator: iterator of node keys
"""
leaf = self.node(v)
start = self._get_edge_at(leaf.edge_start)
for edge in self._edge_in_dfs(start):
res = self._get_node_at(edge.source_position)
yield res.key
[docs] def set_neighbors(self, u, new_neighbors):
"""Strictly assign predecessors to a node
Args:
v ([type]): key of the target node
new_predecessors: list or set of the sources key
"""
remove_edge = self.remove_edge
add_edge = self.add_edge
self.add_node(u)
new_neighbors = set(new_neighbors)
old_neighbors = set(self.neighbors(u))
to_add = new_neighbors.difference(old_neighbors)
to_remove = old_neighbors.difference(new_neighbors)
for v in to_remove:
remove_edge(u, v)
for v in to_add:
add_edge(u, v)
[docs] def set_predecessors(self, v, new_predecessors):
"""Strictly assign predecessors to a node
Args:
v ([type]): key of the target node
new_predecessors: list or set of the sources key
"""
remove_edge = self.remove_edge
add_edge = self.add_edge
self.add_node(v)
new_predecessors = set(new_predecessors)
old_predecessors = set(self.predecessors(v))
to_add = new_predecessors.difference(old_predecessors)
to_remove = old_predecessors.difference(new_predecessors)
for u in to_remove:
remove_edge(u, v)
for u in to_add:
add_edge(u, v)
[docs] def neighbors_from(self, nodes):
"""Returns the list of neighbors for all given nodes
Args:
nodes (list): list of node keys
Returns:
dict: a dict mapping node keys to the list of their neighbors
"""
nbs = []
# NOTE: not a oneliner as it would block multithreading
for node in nodes:
nbs.append(self.neighbors(node))
return nbs
[docs] def predecessors_from(self, nodes, n_jobs=-1):
"""Returns the list of predecessors for all given nodes
Args:
nodes (list): list of node keys
n_jobs (int, optional): The number of cpus to use. All available
cpus are used if n_jobs==-1.
Defaults to -1.
Returns:
dict: a dict mapping node keys to the list of their predecessors
"""
nbs = []
# NOTE: not a oneliner as it would block multithreading
for node in nodes:
nbs.append(self.predecessors(node))
return nbs
[docs] def common_neighbors(self, u, v):
"""Returns the set of common neighbors between two nodes
Args:
u (str): key of the first node
v (str): key of the second node
Returns:
set: the set of all common neighbors
"""
u_nbs = set(self.neighbors(u))
v_nbs = set(self.neighbors(v))
return u_nbs.intersection(v_nbs)
[docs] def common_predecessors(self, u, v):
"""Returns the set of common predecessors between two nodes
Args:
u (str): key of the first node
v (str): key of the second node
Returns:
set: the set of all common predecessors
"""
u_nbs = set(self.predecessors(u))
v_nbs = set(self.predecessors(v))
return u_nbs.intersection(v_nbs)
def out_degree(self, key):
# returns out-degree
count = 0
for _ in self.neighbors(key):
count += 1
return count
def in_degree(self, key):
# returns in-degree
count = 0
for _ in self.predecessors(key):
count += 1
return count
[docs] def node(self, key):
"""Get node from key
Args:
key (str): unique string identifier of the node
Raises:
NodeNotFound: the key does not match any node in the graph
Returns:
node_class: node
"""
# if key is already a node object
if isinstance(key, self.node_class):
return key
if len(key) > self.max_key_len:
raise KeyTooLong
# if key is in cache
pos = self.cache_key_to_pos.get(key)
if pos is not None:
node = self.cache_pos_to_node.get(pos)
if node is not None:
return node
key_hash = self.hash_func(key)
node = self.node_class(hash=key_hash, key=key)
# unroll tree
position = 0
prev_node, state = self._find_node_pos(position, node)
if state == 0:
self._cache_node(prev_node)
return prev_node
else:
raise NodeNotFound
[docs] def edge(self, source, target, edge_type=0):
"""Get edge from source, target and edge type
Args:
source (str): key of the source node
target (str): key of the target node
edge_type (int): type of edge to match
Raises:
EdgeNotFound: the arguments do not match any edge in the graph
Returns:
edge_class: edge
"""
# get source and target nodes
source = self.node(source)
target = self.node(target)
# new edge to create
new_edge = self.edge_class(source_position=source.position,
target_position=target.position,
hash=self._get_edge_hash(
source, target, edge_type),
type=edge_type)
edge, state = self._find_edge_out_pos(source.edge_start, new_edge)
# edge already exists
if state == 0:
return edge
raise EdgeNotFound(f"Edge {source.key} -> {target.key} not found")
[docs] def has_node(self, node):
"""Returns True if node exists
Args:
node (str): string key of the node
Returns:
bool: True if node exists, False otherwise
"""
try:
self.node(node)
return True
except NodeNotFound:
return False
[docs] def has_edge(self, source, target, edge_type=0):
"""Returns True if (source, target[, edge_type]) exists
Args:
source (str): key of source node
target (str): key of target node
edge_type (int, optional): edge type to match. Defaults to 0.
Returns:
bool: True if edge exists, False otherwise
"""
try:
self.edge(source, target, edge_type)
return True
except EdgeNotFound:
return False
[docs] def batch_get_nodes(self, batch_size=100, cursor=0):
"""Get a batch of nodes starting from a given table position
Args:
batch_size (int): number of nodes to return per batch
cursor (int): starting position in the table
Returns:
list[node_class]: list of nodes
int: cursor for the next batch. Equals -1 if the end is reached
"""
_get_node_at = self._get_node_at
EDGE_SIZE = self.EDGE_SIZE
HEADER_SIZE = self.HEADER_SIZE
NODE_TO_EDGE_RATIO = self.NODE_TO_EDGE_RATIO
position = cursor
n_nodes = 0
nodes = []
append = nodes.append
next_table_position = self.header.next_table_position
while (
position <= next_table_position and
n_nodes < batch_size
):
ind = position * EDGE_SIZE + HEADER_SIZE
is_node, exists = unpack("??", self.mm[ind: ind + 2])
if is_node:
if not exists or position == 0:
position += NODE_TO_EDGE_RATIO
continue
node = _get_node_at(position)
append(node)
position += NODE_TO_EDGE_RATIO
n_nodes += 1
else:
position += 1
if position > next_table_position:
position = -1
return nodes, position
[docs] def batch_get_edges(self, batch_size=100, cursor=0):
"""Get a batch of edges starting from a given table position
Args:
batch_size (int): number of edges to return per batch
cursor (int): starting position in the table
Returns:
list[tuple]: list of edges (tuple of str)
int: cursor for the next batch. Equals -1 if the end is reached
"""
_get_edge_at = self._get_edge_at
_get_keys_from_edge = self._get_keys_from_edge
EDGE_SIZE = self.EDGE_SIZE
HEADER_SIZE = self.HEADER_SIZE
NODE_TO_EDGE_RATIO = self.NODE_TO_EDGE_RATIO
position = cursor
n_edges = 0
edges = []
append = edges.append
next_table_position = self.header.next_table_position
while (
position <= next_table_position and
n_edges < batch_size
):
ind = position * EDGE_SIZE + HEADER_SIZE
is_node, exists = unpack("??", self.mm[ind: ind + 2])
if is_node:
position += NODE_TO_EDGE_RATIO
else:
if not exists:
position += 1
continue
edge = _get_edge_at(position)
position += 1
if edge.is_edge_start:
continue
append(_get_keys_from_edge(edge))
n_edges += 1
if position > next_table_position:
position = -1
return edges, position
[docs] def adjacency_matrix(self, weight=None):
"""Return adjacency matrix of the graph
Args:
weight (str): NOT IMPLEMENTED! Weight attribute
Returns:
scipy.sparse.csr_matrix: sparse matrix representing the graph
"""
import numpy as np
from scipy.sparse import csr_matrix
node_to_index = {}
index_to_node = {}
index = 0
xs = []
ys = []
data = []
for source, target in self.edges:
source_id = node_to_index.get(source)
if source_id is None:
node_to_index[source] = index
index_to_node[index] = source
source_id = index
index += 1
target_id = node_to_index.get(target)
if target_id is None:
node_to_index[target] = index
index_to_node[index] = target
target_id = index
index += 1
xs.append(source_id)
ys.append(target_id)
if weight is None:
data.append(1)
if weight is None:
dtype = np.bool_
A = csr_matrix((data, (xs, ys)), shape=(index, index), dtype=dtype)
return A, index_to_node
[docs] def subgraph(self, nodes, weight=None):
"""Return adjacency matrix of a subgraph
Args:
nodes (list): subset of nodes to consider for the subgraph
weight (str): NOT IMPLEMENTED! Weight attribute
Returns:
scipy.sparse.csr_matrix: sparse matrix representing the subgraph
"""
import numpy as np
from scipy.sparse import csr_matrix
index_to_node = dict(enumerate(set(nodes)))
node_to_index = {v: k for k, v in index_to_node.items()}
n_nodes = len(index_to_node)
xs = []
ys = []
data = []
for source in nodes:
source_id = node_to_index[source]
for target in self.neighbors(source):
target_id = node_to_index.get(target, None)
if target_id is None:
continue
xs.append(source_id)
ys.append(target_id)
if weight is None:
data.append(1)
if weight is None:
dtype = np.bool_
A = csr_matrix((data, (xs, ys)), shape=(n_nodes, n_nodes), dtype=dtype)
return A, index_to_node
# =========================================================================
# Overload
# =========================================================================
[docs] def __getitem__(self, item):
"""Get node or edge
Args:
item (str or tuple): if one string is provided, returns
corresponding node.
If two strings are provided (+ an optional
edge_type), return the corresponding edge.
Returns:
node_class or edge_class: node or edge
"""
if isinstance(item, str):
return self.node(item)
elif isinstance(item, tuple):
return self.edge(*item)
[docs] def __contains__(self, item):
"""Returns True if node or edge exists
Args:
item (str or tuple): if item is a string, return has_node(item),
if item is a tuple, return has_edge(...)
Raises:
KinbakuException: query malformed
Returns:
bool: True if edge or node exists, False otherwise
"""
if isinstance(item, tuple):
if 2 <= len(item) <= 3:
return self.has_edge(*item)
elif isinstance(item, str):
return self.has_node(item)
raise KinbakuException("argument not understood")
def __del__(self):
self.close()
# =========================================================================
# Setters
# =========================================================================
def _set_node_at(self, leaf, position):
values = self._parse_values(leaf)
ind = position * self.EDGE_SIZE + self.HEADER_SIZE
self.mm[ind: ind + self.NODE_SIZE] = pack(self.NODE_FORMAT, *values)
self._cache_node(leaf)
def _set_edge_at(self, edge, position):
values = self._parse_values(edge)
ind = position * self.EDGE_SIZE + self.HEADER_SIZE
try:
self.mm[ind: ind + self.EDGE_SIZE] = pack(
self.EDGE_FORMAT, *values)
except IndexError:
print(self.header.table_size, ind + self.EDGE_SIZE, ind)
raise IndexError
def _erase_edge_at(self, position):
ind = position * self.EDGE_SIZE + self.HEADER_SIZE
self.mm[ind: ind + self.EDGE_SIZE] = self.EDGE
self.edge_tombstone.append(position)
self._decrement_edge()
def _erase_node(self, node):
self._uncache_node(node)
ind = node.position * self.EDGE_SIZE + self.HEADER_SIZE
self.mm[ind: ind + self.NODE_SIZE] = self.NODE
self.node_tombstone.append(node.position)
self._decrement_node()
# also remove edge start
ind = node.edge_start * self.EDGE_SIZE + self.HEADER_SIZE
self.mm[ind: ind + self.EDGE_SIZE] = self.EDGE
self.edge_tombstone.append(node.edge_start)
self._decrement_edge()
# =========================================================================
# Create & delete Nodes & Edges
# =========================================================================
[docs] def add_node(self, key, attr=None):
"""Add a single node to graph, with optional attributes.
Args:
key (str): string key uniquely identifying a node
attr (dict, optional): custom attributes. Must match the
additional attributes provided in the
`node_class` parameter. Defaults to None.
Returns:
node_class: returns node as an instance of Graph:`node_class`
"""
# key must be of appropriate size
if len(key) > self.max_key_len:
raise KeyTooLong
key_hash = self.hash_func(key)
# new node
new_node = self.node_class(
hash=key_hash, index=self.header.node_id, key=key)
self._parse_attributes(new_node, attr)
# initialize position from cache
position = self.cache_key_to_pos.get(key)
if position is None:
position = 0
# unroll tree
prev_node, state = self._find_node_pos(position, new_node)
# node already exists
if state == 0:
if new_node == prev_node:
return prev_node
(
new_node.left,
new_node.right,
new_node.index,
new_node.position,
new_node.parent,
new_node.edge_start
) = (
prev_node.left,
prev_node.right,
prev_node.index,
prev_node.position,
prev_node.parent,
prev_node.edge_start
)
self._set_node_at(new_node, new_node.position)
return new_node
# new node and edge positions
new_node_position, node_recycled = self._get_next_node_position()
self._increment_node(node_recycled)
new_edge_position, edge_recycled = self._get_next_edge_position()
self._increment_edge(edge_recycled)
# new node
new_node.position, new_node.edge_start, new_node.parent = (
new_node_position, new_edge_position, prev_node.position)
self._set_node_at(new_node, new_node_position)
# new 'dummy' edge
edge = self.edge_class(source_position=new_node_position,
hash=new_node.hash,
is_edge_start=True,
position=new_node.edge_start)
self._set_edge_at(edge, new_node.edge_start)
# update parent node
if state == -1:
prev_node.left = new_node_position
else:
prev_node.right = new_node_position
self._set_node_at(prev_node, prev_node.position)
return new_node
[docs] def add_edge(self, source_key, target_key, attr=None, edge_type=0):
"""Add a single edge with custom attributes to graph
Args:
source_key (str): string key of the source node
target_key (str): string key of the target node
attr (dict, optional): not yet implemented. Defaults to None.
edge_type (int, optional): integer identifier of the edge type.
Defaults to 0.
Returns:
edge_class: returns edge as an instance of Graph:`edge_class`
"""
try:
source = self.node(source_key)
except NodeNotFound:
source = self.add_node(source_key)
try:
target = self.node(target_key)
except NodeNotFound:
target = self.add_node(target_key)
# new edge to create
new_edge = self.edge_class(source_position=source.position,
target_position=target.position,
hash=self._get_edge_hash(
source, target, edge_type),
type=edge_type)
self._parse_attributes(new_edge, attr)
# =====================================================================
# OUT direction
prev_out, state = self._find_edge_out_pos(source.edge_start, new_edge)
if state == 0: # edge already exists
if prev_out == new_edge:
return prev_out
new_edge_position = prev_out.position
(
new_edge.position,
new_edge.source_position,
new_edge.target_position,
new_edge.out_edge_left,
new_edge.out_edge_right,
new_edge.out_edge_parent,
new_edge.in_edge_left,
new_edge.in_edge_right,
new_edge.in_edge_parent
) = (
new_edge_position,
prev_out.source_position,
prev_out.target_position,
prev_out.out_edge_left,
prev_out.out_edge_right,
prev_out.out_edge_parent,
prev_out.in_edge_left,
prev_out.in_edge_right,
prev_out.in_edge_parent
)
self._set_edge_at(new_edge, new_edge_position)
return new_edge
else:
self._parse_attributes(new_edge, attr)
new_edge_position, recycled = self._get_next_edge_position()
new_edge.position = new_edge_position
if state == -1: # must insert left
prev_out.out_edge_left = new_edge_position
else: # must insert right
prev_out.out_edge_right = new_edge_position
# update previous out-edge
self._set_edge_at(prev_out, prev_out.position)
# =====================================================================
# IN direction
prev_in, state = self._find_edge_in_pos(target.edge_start, new_edge)
if state == -1:
prev_in.in_edge_left = new_edge_position
elif state == 1:
prev_in.in_edge_right = new_edge_position
else: # edge shouldn't exist
raise KinbakuError("serious integrity error")
# update previous in-edge
self._set_edge_at(prev_in, prev_in.position)
# =====================================================================
# insert new edge
new_edge.out_edge_parent = prev_out.position
new_edge.in_edge_parent = prev_in.position
self._set_edge_at(new_edge, new_edge_position)
self._increment_edge(recycled)
return new_edge
[docs] def remove_edge(self, source_key, target_key, edge_type=0):
"""Remove the edge linking source to target, with the given edge_type
Args:
source_key (str): string key of the source node
target_key (str): string key of the target node
edge_type (int, optional): edge type of the edge to remove.
Defaults to 0.
"""
edge = self.edge(source_key, target_key, edge_type)
self._remove_edge(edge)
def _remove_edge(self, edge):
self._remove_edge_from_tree(edge, out=True)
self._remove_edge_from_tree(edge, out=False)
self._erase_edge_at(edge.position)
[docs] def remove_node(self, key):
"""Remove node and incident edges from graph
Args:
key (str): string key of the node to remove
"""
source = self.node(key)
edge_start = self._get_edge_at(source.edge_start)
existing_edges = (
list(self._edge_out_dfs(edge_start)) +
list(self._edge_in_dfs(edge_start)))
for edge in existing_edges:
edge = self._get_edge_at(edge.position)
if not edge.exists:
continue
self._remove_edge(edge)
# erase node from file
self._remove_node_from_tree(source)
self._erase_node(source)
[docs] def __setitem__(self, key, attr):
"""Create/update node with custom attributes
Args:
key (str): node key
attr (dict): attributes as provided in node_class
Returns:
node_class: inserted node
"""
if isinstance(key, tuple):
return self.add_edge(key[0], key[1], attr=attr)
return self.add_node(key, attr=attr)
# =========================================================================
# Utils
# =========================================================================
def _str_to_list(self, key):
res = [ord(c) for c in key]
res += [0] * (self.max_str_len - len(res))
return res
def _key_to_list(self, key):
res = [ord(c) for c in key]
res += [0] * (self.max_key_len - len(res))
return res