flow/network: fix handling of edges with subrecords at both ends
This commit is contained in:
parent
75d569a12c
commit
66ac62d0bb
|
@ -75,12 +75,12 @@ class DataFlowGraph(MultiDiGraph):
|
||||||
# Returns a dictionary
|
# Returns a dictionary
|
||||||
# source -> [sink1, ..., sinkn]
|
# source -> [sink1, ..., sinkn]
|
||||||
# source element is a (node, endpoint) pair.
|
# source element is a (node, endpoint) pair.
|
||||||
# sink elements are (node, endpoint, source subrecord) triples.
|
# sink elements are (node, endpoint, source subrecord, sink subrecord) triples.
|
||||||
def _source_to_sinks(self):
|
def _source_to_sinks(self):
|
||||||
d = dict()
|
d = dict()
|
||||||
for u, v, data in self.edges_iter(data=True):
|
for u, v, data in self.edges_iter(data=True):
|
||||||
el_src = (u, data["source"])
|
el_src = (u, data["source"])
|
||||||
el_dst = (v, data["sink"], data["source_subr"])
|
el_dst = (v, data["sink"], data["source_subr"], data["sink_subr"])
|
||||||
if el_src in d:
|
if el_src in d:
|
||||||
d[el_src].append(el_dst)
|
d[el_src].append(el_dst)
|
||||||
else:
|
else:
|
||||||
|
@ -90,11 +90,11 @@ class DataFlowGraph(MultiDiGraph):
|
||||||
# Returns a dictionary
|
# Returns a dictionary
|
||||||
# sink -> [source1, ... sourcen]
|
# sink -> [source1, ... sourcen]
|
||||||
# sink element is a (node, endpoint) pair.
|
# sink element is a (node, endpoint) pair.
|
||||||
# source elements are (node, endpoint, sink subrecord) triples.
|
# source elements are (node, endpoint, sink subrecord, source subrecord) triples.
|
||||||
def _sink_to_sources(self):
|
def _sink_to_sources(self):
|
||||||
d = dict()
|
d = dict()
|
||||||
for u, v, data in self.edges_iter(data=True):
|
for u, v, data in self.edges_iter(data=True):
|
||||||
el_src = (u, data["source"], data["sink_subr"])
|
el_src = (u, data["source"], data["sink_subr"], data["source_subr"])
|
||||||
el_dst = (v, data["sink"])
|
el_dst = (v, data["sink"])
|
||||||
if el_dst in d:
|
if el_dst in d:
|
||||||
d[el_dst].append(el_src)
|
d[el_dst].append(el_src)
|
||||||
|
@ -125,25 +125,25 @@ class DataFlowGraph(MultiDiGraph):
|
||||||
if len(sources) > 1 or sources[0][2] is not None:
|
if len(sources) > 1 or sources[0][2] is not None:
|
||||||
# build combinator
|
# build combinator
|
||||||
# "layout" is filled in during instantiation
|
# "layout" is filled in during instantiation
|
||||||
subrecords = [dst_subrecord for src_node, src_endpoint, dst_subrecord in sources]
|
subrecords = [dst_subrecord for src_node, src_endpoint, dst_subrecord, src_subrecord in sources]
|
||||||
combinator = ActorNode(plumbing.Combinator, {"subrecords": subrecords})
|
combinator = ActorNode(plumbing.Combinator, {"subrecords": subrecords})
|
||||||
# disconnect source1 -> sink ... sourcen -> sink
|
# disconnect source1 -> sink ... sourcen -> sink
|
||||||
# connect source1 -> combinator_sink1 ... sourcen -> combinator_sinkn
|
# connect source1 -> combinator_sink1 ... sourcen -> combinator_sinkn
|
||||||
for n, (src_node, src_endpoint, dst_subrecord) in enumerate(sources):
|
for n, (src_node, src_endpoint, dst_subrecord, src_subrecord) in enumerate(sources):
|
||||||
self.del_connections(src_node, dst_node,
|
self.del_connections(src_node, dst_node,
|
||||||
{"source": src_endpoint, "sink": dst_endpoint})
|
{"source": src_endpoint, "sink": dst_endpoint})
|
||||||
self.add_connection(src_node, combinator,
|
self.add_connection(src_node, combinator,
|
||||||
src_endpoint, "sink{0}".format(n))
|
src_endpoint, "sink{0}".format(n), source_subr=src_subrecord)
|
||||||
# connect combinator_source -> sink
|
# connect combinator_source -> sink
|
||||||
self.add_connection(combinator, dst_node, "source", dst_endpoint)
|
self.add_connection(combinator, dst_node, "source", dst_endpoint)
|
||||||
# Insert splitters.
|
# Insert splitters.
|
||||||
for (src_node, src_endpoint), sinks in self._source_to_sinks().items():
|
for (src_node, src_endpoint), sinks in self._source_to_sinks().items():
|
||||||
if len(sinks) > 1 or sinks[0][2] is not None:
|
if len(sinks) > 1 or sinks[0][2] is not None:
|
||||||
subrecords = [src_subrecord for dst_node, dst_endpoint, src_subrecord in sinks]
|
subrecords = [src_subrecord for dst_node, dst_endpoint, src_subrecord, dst_subrecord in sinks]
|
||||||
splitter = ActorNode(plumbing.Splitter, {"subrecords": subrecords})
|
splitter = ActorNode(plumbing.Splitter, {"subrecords": subrecords})
|
||||||
# disconnect source -> sink1 ... source -> sinkn
|
# disconnect source -> sink1 ... source -> sinkn
|
||||||
# connect splitter_source1 -> sink1 ... splitter_sourcen -> sinkn
|
# connect splitter_source1 -> sink1 ... splitter_sourcen -> sinkn
|
||||||
for n, (dst_node, dst_endpoint, src_subrecord) in enumerate(sinks):
|
for n, (dst_node, dst_endpoint, src_subrecord, dst_subrecord) in enumerate(sinks):
|
||||||
self.del_connections(src_node, dst_node,
|
self.del_connections(src_node, dst_node,
|
||||||
{"source": src_endpoint, "sink": dst_endpoint})
|
{"source": src_endpoint, "sink": dst_endpoint})
|
||||||
self.add_connection(splitter, dst_node,
|
self.add_connection(splitter, dst_node,
|
||||||
|
|
Loading…
Reference in New Issue