Join Layer

The traversal layer takes a series of queries with edge types to follow and edge types (or node types) to terminate on. It creates a Flow[A, B] that will pull node ids from a Source[A] and return node ids that are connected to those node ids by the edges specified.

The join layer extends the capabilities of the traversal layer by tying multiple traversals into a tree. This tree allows us to tie an entire structure of nodes together in our graph. This is the minimum spanning tree that will help us fulfill our constraint graph.

In this tree, the outputs of some edge traversals will be sent as inputs to two or more other edge traversals. This branching is trivial, but the challenge here is that we want this tree to be streamable and thus must have both a single start (our root query) and end. Thus, all the branched leaves of our tree must be joined in the end into a combined outlet.

The join layer is what provides this ability to branch our edge traversals (as in B β†’ D, E) as well as join the leaves of our tree together into a combined output (C, D, E.)

Broadcast

Suppose I want to search for a function that calls both methods meth1() and meth2() on its first argument.

function(arg1) {
	arg1.meth1()
	arg1.meth2()
}

I would need to traverse with branching checkpoints like so, with a branch at B.

The solution for sending data to two different destinations is trivial, simply use a Broadcast node in Akka Streams. However, it is worth noting that we are using this in a pull, not push capacity.

Pushing to a broadcast node pushes data to n other nodes. Pulling for a broadcast node is a bit counterintuitive. From the Akka Streams documentation for a Broadcast node: β€œit will emit when all of its downstream nodes stop backpressuring.” This means that both C and D need to request data in order for B to run. If only one of those nodes requests data, B will never be called. This is an important thing to note as it can easily lead to deadlocks if the terminal streams are not consumed properly.

Join tree

To stream the entire tree, it is necessary to have a combined endpoint joining all the leaves of the tree.

In the relational tree above, the leaves E and F must be joined together at the point where they diverged, B. This is accomplished by creating another node (E<>F) that accepts the two streams E and F and joins them on the key B. The format of the data flowing through the join layer tree is a Map of node key (A, B, C, etc.) to traces. F would output Map[A β†’ _, B β†’ _, D β†’ _, F β†’ _] and E would output Map[A β†’ _, B β†’ _, C β†’ _, E β†’ _] so both streams have access to the trace for B and thus the key to join on.

If there are more than two leaves, it is necessary to do multiple joins, each of them joined on where the two keys diverged.

Streaming merge join

The meat and potatoes of this layer is a custom Akka Streams node that performs a streaming join on two keyed streams.

Streaming joins typically use a hash join algorithm where a smaller data set, defined by a time window, is kept in memory to join against the stream.

This makes sense with infinite streams, but the streams we are joining are finite and have no time window. We are using streaming to limit the work we need to do upstream.

A hash join in this case, where we consume multiple values from one stream and hold that in memory to join against the other stream, would simply consume more memory without any benefits for algorithm complexity or speed.

In fact, the merge join is a simply a form of the hash join where we hold one value in the hash.

Last updated