Słyszałeś/aś o Apache Graphframes? Teoria grafów to nie tylko ich własności, ale i algorytmy. Przekształcenie danych do modelu grafowego umożliwia zastosowanie niektórych z nich. W tym wpisie pobierzemy dane “followersów” z Twittera i wykonamy parę prostych algorytmów w Graphframes w PySparku.
Pobranie danych przez Tweepy
Potrzebujemy danych do grafu. Napisałem skrypt w Pytonie, który na podstawie zalogowanego konta pobiera jego znajomych i znajomych jego znajomych… do pewnej głębokości.
Aby korzystać z API Twittera, trzeba założyć konto deweloperskie. W przypadku pythona wygodniej użyć biblioteki Tweepy niż ręcznie puszczać zapytania.
import tweepy
import csv
class FollowersFinder:
def __init__(self, consumer_key, consumer_secret, access_token, access_secret):
self.scanned_accounts = set()
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
self.api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
def start(self, depth = 3):
self.max_depth = depth
account_self_id = self.api.me().id
self.append_account(account_self_id)
self.find_relationships(account_self_id)
def find_relationships(self, account_id, depth = 0):
if (account_id in self.scanned_accounts):
return
if (depth > self.max_depth):
return
try:
followers_ids = self.api.followers_ids(account_id)
friends_ids = self.api.friends_ids(account_id)
except tweepy.TweepError:
print("Failed to run the command on that user, Skipping...")
return
self.scanned_accounts.add(account_id)
if (len(followers_ids) > 10000 or len(friends_ids) > 10000):
return
for follower_id in followers_ids:
self.append_account(follower_id)
self.append_relationship((follower_id,account_id))
self.find_relationships(follower_id,depth + 1)
for friend_id in friends_ids:
self.append_account(friend_id)
self.append_relationship((account_id,friend_id))
self.find_relationships(friend_id,depth + 1)
def append_account(self, value):
with open('accounts.csv', mode='a', newline='') as employee_file:
employee_writer = csv.writer(employee_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
employee_writer.writerow([value])
def append_relationship(self, value):
with open('relationships.csv', mode='a', newline='') as employee_file:
employee_writer = csv.writer(employee_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
employee_writer.writerow(value)
consumer_key = 'srutu'
consumer_secret = 'tutu'
access_token = 'majtki'
access_secret = 'z drutu'
finder = FollowersFinder(consumer_key, consumer_secret, access_token, access_secret)
finder.start(3)
Metoda wykonywana jest rekurencyjnie. Opakowałem to wszystko w klasę. Jakoś łatwiej mi wtedy ogarnąć kod. Dane wrzucane są bezpośrednio do csv-ki. Darowałem sobie sprawdzanie duplikatów, można to zrobić na poziomie spark-a. W związku z ograniczeniami korzystania z API warto pamiętać o wait_on_rate_limit=True, wait_on_rate_limit_notify=True, oraz try .. catch przy używaniu tweepy. Limity i brak uprawnień może wywalić nam skrypt.
Struktura csv-ek jest minimalna. Wierzchołki to same ID. Krawędzie to ID “od” i ID “do”.
Po około 12 godzinach działania skryptu przerwałem go. Zebrałem niecałe 400k kont, co dało około 15 MB.
PySpark + Jupyter + Graphframes
Aby odpalić Jupyter z pysparkiem i załadowaną biblioteką Graphfarmes trzeba:
- Zainstalować OpenJDK 1.8
- Zainstalować Sparka
- Ustawić zmienną SPARK_HOME
- Dodać SPARK_HOME/bin do PATH-a
- Ustawić PYSPARK_DRIVER_PYTHON i PYSPARK_DRIVER_PYTHOn_OPTS jak poniżej
export SPARK_HOME=/home/maciej/spark-2.4.4-bin-hadoop2.7
export HADOOP_HOME=/home/maciej/hadoop-3.2.1
export PATH="$SPARK_HOME/bin:$PATH"
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
Teraz poniższa komenda odpali nam Jupytera, pysparka i dołączy bibliotekę z Graphframes
pyspark --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11
Utworzenie grafu
Konstruktor Graphframes potrzebuje dwóch parametrów. DataFrame z wierzchołkami (kolumna “id”) oraz krawędziami (kolumny “src”, “dst” i “relationship”. Zwróć uwagę na to, że są to pojedyncze DataFrame. Jeśli mylisz o modelu grafowym, w którym wierzchołki mają różne schematy, musisz sprowadzić je do jednego DataFrame. Przykład jak to zrobić wkrótce na Wiadro Danych ?.
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import graphframes as G
spark = SparkSession\
.builder\
.appName("tweets")\
.getOrCreate()
accounts = spark.read.csv("accounts_followers.csv")
relationships = spark.read.csv("relationships_followers.csv")
vertices = accounts.select(F.col("_c0").alias("id")).distinct()
edges = relationships\
.select(
F.col("_c0").alias("src"),\
F.col("_c1").alias("dst"),\
F.lit("follows").alias("relationship"))\
.distinct()
g = G.GraphFrame(vertices, edges)
Pozostaje nam tylko pobawić się możliwościami Graphframes.
PageRank i trójkąty
Ciekawym algorytmem jest PageRank opracowany przez założycieli Google. Okazuje się, że największy ranking PageRank ma franz_prg, andrzejkrzywda, konradkokosa. Widocznie najlepiej ćwierkają ?.
Liczenie trójkątów również pozwala oszacować “popularność” węzłów w grafie. Tym razem wygrał konradkokosa.
Szukanie ścieżki
Jako że jest to graf, znajdźmy najkrótszą ścieżkę między dwoma wierzchołkami. Odpowiada za to metoda shortestPaths.
Jak widać na screenie, metoda ta bierze pod uwagę kierunek krawędzi. Zwraca tylko dystans. Nie dowiemy się jak przebiega ta ścieżka.
Kolejna metoda to bfs (Breadth-first search). Sprawdźmy, czy faktycznie od wierzchołka 278718784 do 1189279208150720512 nie znajdziemy ścieżki.
W porównaniu do shortestPaths, w tym przypadku dostajemy informacje, którymi wierzchołkami i krawędziami przebiega ścieżka.
Co dalej?
To był tylko ułamek tego, co umożliwia Graphframes. Jak znajdę jakieś ciekawe dane, z pewnością wrócę do tej biblioteki. Kolejne kwestie warte poruszenia to wizualizacja i integracja np. z Neo4j.
Grafy można już przetwarzać na zwykłym MSSQL i jest to nawet wydajne. Mam tam taki graf z 18 mln krawędzi i całkiem sprawnie sie to przetwarza.
Słyszałem o grafach w MSSQL. Nie miałem okazji głębiej wgryźć się w temat. Od znajomego słyszałem, że “szału nie ma”, ale jak widać zdania są podzielone. Wydaje mi się,że większość algorytmów trzeba tam klepać ręcznie (choćby taki PageRank https://stackoverflow.com/questions/17787944/sql-pagerank-implementation ). Jakie operacje tam wykonujesz?
To co pokazałeś w linku to zwykły goły SQL na relacyjnych tablicach
Najkrótsza ścieżka jest. Bardzo łatwy język zapytań.
No ale aby zrobić coś bardziej skomplikowanego, to trzeba się trochę nauczyć.
Agregacje są.
https://github.com/microsoft/sql-server-samples/tree/master/samples/features/sql-graph
pytanie totalnego laika – czemu Spark? Albo inaczej – kiedy Spark jest strzelaniem z armaty do wróbla?
Masz jakieś rady dla kogoś, kto chce się nauczyć korzystania ze Sparka? A bardziej konkretnie – czy Spark do nauki na lokalnej maszynie ma sens? (bo np. Kafka chyba nie)
To czy używać Sparka czy nie wg. mnie zależy od rozmiaru danych ( https://www.quora.com/When-should-you-use-pandas-SciPy-NumPy-scikit-learn-mahout-and-spark-When-would-you-choose-one-over-the-other tutaj ktoś się wypowiedział na ten temat ). Np. na danych z tego wpisu https://wiadrodanych.pl/big-data/hdfs/problem-malych-plikow-w-hdfs/ ewidentnie widać, że hdfs + spark to zły pomysł (przynajmniej w tej formie). Co do nauki sparka, kafki czy innej technologii wg mnie lokalna maszyna ma sens. Nie trzeba mielić gigabajtów by sprawdzić jak coś działa 🙂 Jeśli potrzebujesz rozproszenia, możesz zasymulować wiele instancji np. używając dockera.