Una pequeña experiencia con Spark Graph

 

Por David González
Big Data Manager en OpenSistemas

 

Las bases de datos de grafos están cogiendo mucha fuerza durante los últimos años, especialmente cuando hay que estudiar relaciones entre las entidades, o generar relaciones nuevas entre entidades relacionadas de distinta manera.

Neo4J es una de las bases de datos más destacadas, funciona francamente bien, quizás su punto más interesante es que el tiempo de una consulta es lineal e independiente del tamaño del grafo global, sólo se tiene en cuenta el subgrafo que atañe al estudio. Otro punto muy fuerte es el lenguaje de consulta, cypher, con el que podemos hacer consultas de forma sencilla y natural, que sobre una base de datos relacional llevaría numerosos joins.
Si quisiéramos obtener todas las personas que han trabajado con Keanu Reeves, podríamos obtenerlo con la siguiente consulta:
match (p:Person{name:”keanu reeves”})-[]->(m:Movie)<-[]-(p2:Person)
return p2Como veis es muy intuitivo.
Pero ¿qué pasa cuando nuestro universo de datos es tan grande que no podemos utilizar la versión community de Neo4j? ¿O cuando nuestros datos no responden a un único grafo, ya que en función del área de negocio que estudie los datos, las relaciones son diferentes o tienen matices? Una buena solución es utilizar Spark. Hace unos meses resolvíamos esta problemática utilizando graphX, pero era tedioso generar las nuevas relaciones y realizar las busquedas, ya que teníamos que utilizar map reduce sobre los rdds del grafo.
Ahora utilizamos Spark Graphframe, cuya principal diferencia es que el grafo se construye utilizando dataframes en lugar de rdds. Sin embargo, para nosotros otra diferencia muy importante es que permite realizar búsquedas motif, al estilo cypher.
Vamos a ilustrar un ejemplo del uso de Spark Graph usando motif para buscar/crear información. Por situaros un poco en contexto, tenemos una tabla en impala con relaciones de primer nivel entre personas recogidas mediante formulario, es decir, son datos originales suministrados por los clientes. A partir de esos datos, vamos a crear nuevas relaciones familiares. Nótese que son millones de registros, y si quisieramos utilizar Neo4J tendríamos que migrarlo todo. Además esta tabla se va alimentando continuamente, por lo que no es una opción.
Las relaciones que tenemos actualmente son “cónyuge”, “hij@”, “padre_o_madre”.
Supongamos que ‘v’ es un dataframe de vertices y ‘e’, un dataframe de aristas. Lo primero que haríamos sería construir el grafo con la siguiente llamada:

val g = GraphFrame(v, e)

Como consideraciones, el dataframe ‘v’ debe tener al menos 3 columnas: src, dst y relationship. El dataframe ‘e’ sólo necesita tener id. Los nodos serían las personas (v) y las relaciones las aristas del grafo (e).

¿Como haríamos para sacar los hermanos con estas relaciones?, sencillo, mi heraman@ es hij@ de mi “padre_o_madre” o dicho de otro modo, yo soy hij@ del padre_o_madre de mi herman@.
val hermanos = g.find(“(a)-[r1]->(b);(b)-[r2]->(c)”
.filter(“r1.relationship = ‘hij@’ and r2.relationship = ‘padre_o_madre and a.id != c.id“)

En primer lugar, vamos a crear un subgrafo de ‘g’ que contiene todas las relaciones posibles que cumplan que un nodo a tiene un tipo de relación con un nodo ‘b’ y, que a su vez, tiene un tipo de relación con un nodo ‘c’. Después, vamos a filtrar el subgrafo, para quedarnos solo con los hermanos, en este caso, la primera relación debe ser de “hij@” y la segunda de “padre_o_madre”, siendo importante la última comprobación, para que yo mismo no cumpla las condiciones para ser mi hermano (ya que siempre seré hijo de mis padres).

Como en este ejemplo no se generan nuevos vértices, sólo nuevas relaciones. Crear el nuevo grafo es tan sencillo como generar a partir de “hermanos” un dataframe con la misma estructura que ‘v’ y hacer un union.

La construcción del grafo es más tediosa que si lo tenemos en la base de datos como pasaría con Neo4j. Sin embargo, podemos generar todos los grafos que queramos con el mismo origen, cumpliendo cualquier criterio. En segundo lugar, las búsquedas no son tan eficientes, pero sí son más escalables, ya que se ejecutan en el cluster de Spark y pueden paralelizarse.

Finalmente una nota para la optimización: supongamos que además de los hermanos queremos otro parentesco del mismo nivel. Podemos realizar cada una en dos pasos y persistir el subgrafo:

def familiaresNivel2(g:GraphFrame):RDD[Row] = {
val grafo2 = g.find(“(a)-[r1]->(b);(b)-[r2]->(c)”)
grafo2.persist()
val hermanos = this.hermanos(grafo2)
val suegrosYernos = suegrosYYernos(grafo2)
grafo2.unpersist()
val familiaresNivel2 = hermanos.union(suegrosYernos)
return familiaresNivel2
}

def hermanos(g:DataFrame):RDD[Row] = {
return g.filter(“r1.relationship = ‘descendiente’ and r2.relationship = ‘ascendiente’ and a.id != c.id”)
.map(x => x match {
//

Deberíamos mirar todas las aristas y calcular en base a ellas si la relación es actual, pero de momento con que lo sea la original lo consideramos actual
case
Row(nodo1: Row, descendiente: Row, padre, ascendiente, hermano: Row) =>
Row(nodo1.getLong(nodo1.fieldIndex(“id”)), hermano.getLong(hermano.fieldIndex(“id”)), “hermano”, descendiente.getInt(descendiente.fieldIndex(“anio”)), false, descendiente.getBoolean(descendiente.fieldIndex(“actual”)),priority(“hermano”))
}).distinct()
}def suegrosYYernos(g:DataFrame):RDD[Row] = {
val suegrosYernos = g.filter(“r1.relationship = ‘conyuge’ and r2.relationship = ‘descendiente'”)

}

Si queréis ver un ejemplo completo, podéis hacerlo aquí: https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-scala.html

En conclusión, gracias a esto se puede utilizar la potencia de un grafo sobre cualquier fuente de información, igual que lo hemos realizado sobre impala, podemos hacerlo sobre cualquier otro conector de Spark. Espero que os haya gustado, y que valoréis esta solución cuando necesitéis realizar tareas similares.