PySpark : Topic Modelling using LDA

1 minute read

Topic Modelling using LDA

I have used tweets here to find top 5 topics discussed using Pyspark

Theory:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession, Row
from pyspark import SQLContext
from nltk.corpus import stopwords
import re as re
from pyspark.ml.feature import CountVectorizer , IDF
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA as MLlibLDA
from pyspark.mllib.clustering import LDAModel
def isEnglish(s):
    try:
        s.encode(encoding='utf-8').decode('ascii')
    except UnicodeDecodeError:
        return False
    else:
        return True

spark = SparkSession.builder.appName('spark-russian-tweets').getOrCreate()

sc = spark.sparkContext

sqlContext = SQLContext(sc)

data = sqlContext.read.format("csv") \
   .options(header='true', inferschema='true') \
   .load("hdfs:///user/x/input/*.csv")
# data.take(1)

Removing Stopwords For getting relevant words I used customized stopwords list (which is very extensive hence I am not including it). Customizing stopwords list is very important in topic modelling to get relevant topics:

contents = data.rdd.map(lambda x : x['content']).filter(lambda x: x is not None)
StopWords = stopwords.words("english")
tokens = contents                                                   \
    .map( lambda document: document.strip().lower())               \
    .map( lambda document: re.split(" ", document))          \
    .map( lambda word: [x for x in word if isEnglish(x)])   \
    .map( lambda word: [x for x in word if x.isalpha()])           \
    .map( lambda word: [x for x in word if len(x) > 4] )           \
    .map( lambda word: [x for x in word if x not in StopWords])    \
    .zipWithIndex()

termCounts = tokens \
    .flatMap(lambda document: document) \
    .map(lambda word: (word, 1)) \
    .reduceByKey( lambda x,y: x + y) \
    .map(lambda tuple: (tuple[1], tuple[0])) \
    .sortByKey(False)
#print(termCounts.take(5)
df_txts = sqlContext.createDataFrame(tokens, ["list_of_words",'index'])

cv = CountVectorizer(inputCol="list_of_words", outputCol="raw_features", vocabSize=5000, minDF=10.0)
cvmodel = cv.fit(df_txts)
result_cv = cvmodel.transform(df_txts)

idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(result_cv)
result_tfidf = idfModel.transform(result_cv)

num_topics = 5
max_iterations = 50

model=MLlibLDA.train(
  result_tfidf.select("index", "features").rdd.mapValues(Vectors.fromML).map(list),k=num_topics,maxIterations=max_iterations
)

vocabArray = cvmodel.vocabulary

wordNumbers = 10
topicIndices = sc.parallelize(model.describeTopics\
                              (maxTermsPerTopic = wordNumbers))
def topic_render(topic):  # specify vector id of words to actual words
    terms = topic[0]
    result = []
    for i in range(wordNumbers):
        term = vocabArray[terms[i]]
        result.append(term)
    return result

topics_final = topicIndices.map(lambda topic: topic_render(topic)).collect()

for topic in range(len(topics_final)):
    print ("Topic" + str(topic))
    for term in topics_final[topic]:
        print (term)
    print ('\n')