PySpark : Topic Modelling using LDA
Topic Modelling using LDA
I have used tweets here to find top 5 topics discussed using Pyspark
Theory:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession, Row
from pyspark import SQLContext
from nltk.corpus import stopwords
import re as re
from pyspark.ml.feature import CountVectorizer , IDF
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA as MLlibLDA
from pyspark.mllib.clustering import LDAModel
def isEnglish(s):
try:
s.encode(encoding='utf-8').decode('ascii')
except UnicodeDecodeError:
return False
else:
return True
spark = SparkSession.builder.appName('spark-russian-tweets').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
data = sqlContext.read.format("csv") \
.options(header='true', inferschema='true') \
.load("hdfs:///user/x/input/*.csv")
# data.take(1)
Removing Stopwords For getting relevant words I used customized stopwords list (which is very extensive hence I am not including it). Customizing stopwords list is very important in topic modelling to get relevant topics:
contents = data.rdd.map(lambda x : x['content']).filter(lambda x: x is not None)
StopWords = stopwords.words("english")
tokens = contents \
.map( lambda document: document.strip().lower()) \
.map( lambda document: re.split(" ", document)) \
.map( lambda word: [x for x in word if isEnglish(x)]) \
.map( lambda word: [x for x in word if x.isalpha()]) \
.map( lambda word: [x for x in word if len(x) > 4] ) \
.map( lambda word: [x for x in word if x not in StopWords]) \
.zipWithIndex()
termCounts = tokens \
.flatMap(lambda document: document) \
.map(lambda word: (word, 1)) \
.reduceByKey( lambda x,y: x + y) \
.map(lambda tuple: (tuple[1], tuple[0])) \
.sortByKey(False)
#print(termCounts.take(5)
df_txts = sqlContext.createDataFrame(tokens, ["list_of_words",'index'])
cv = CountVectorizer(inputCol="list_of_words", outputCol="raw_features", vocabSize=5000, minDF=10.0)
cvmodel = cv.fit(df_txts)
result_cv = cvmodel.transform(df_txts)
idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(result_cv)
result_tfidf = idfModel.transform(result_cv)
num_topics = 5
max_iterations = 50
model=MLlibLDA.train(
result_tfidf.select("index", "features").rdd.mapValues(Vectors.fromML).map(list),k=num_topics,maxIterations=max_iterations
)
vocabArray = cvmodel.vocabulary
wordNumbers = 10
topicIndices = sc.parallelize(model.describeTopics\
(maxTermsPerTopic = wordNumbers))
def topic_render(topic): # specify vector id of words to actual words
terms = topic[0]
result = []
for i in range(wordNumbers):
term = vocabArray[terms[i]]
result.append(term)
return result
topics_final = topicIndices.map(lambda topic: topic_render(topic)).collect()
for topic in range(len(topics_final)):
print ("Topic" + str(topic))
for term in topics_final[topic]:
print (term)
print ('\n')