Predict Trip duration of a Passenger using Pyspark

5 minute read

Regression

This Project implements ensemble regression tree and highlights how it performs compared to a regression tree

Concepts Covered

  • PySpark Functions
  • Feature Engineering and generalization
  • Logarithmic transformations and skewed data
  • Ensemble models - Model Selection
import matplotlib.pyplot as plt
from pyspark.sql.functions import udf,col
from datetime import datetime
import numpy as np
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
sc= SparkContext()
sqlContext = SQLContext(sc)

train = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('C:/train.csv')
#train.show()

reference: stackoverflow.com

Feature Engineering

Here we are using lat-lon data to generate a new feature for trip distance using Haversine formula

from math import radians, cos, sin, asin, sqrt

def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    r = 6372.8 # Radius of earth in kilometers. Use 3956 for miles
    distance =  c * r
    return abs(round(distance, 2))

Spark stores data in dataframes or RDDs. Here I have used dataframes hence I cannot create custom functions without registering the funstion first i.e to save it as if it were one of the built-in database functions first before using it. Thats where User Defined Functions (UDF) comes in.

udf_get_distance = F.udf(haversine)
train = (train.withColumn('DISTANCE', udf_get_distance(
train.pickup_longitude, train.pickup_latitude,
train.dropoff_longitude, train.dropoff_latitude)))
columns_to_drop = ['pickup_longitude', 'pickup_latitude','dropoff_longitude','dropoff_latitude']
train = train.drop(*columns_to_drop)
train.describe().show()
+-------+---------+------------------+------------------+------------------+-----------------+------------------+
|summary|       id|         vendor_id|   passenger_count|store_and_fwd_flag|    trip_duration|          DISTANCE|
+-------+---------+------------------+------------------+------------------+-----------------+------------------+
|  count|  1458644|           1458644|           1458644|           1458644|          1458644|           1458644|
|   mean|     null|1.5349502688798637|1.6645295219395548|              null|959.4922729603659|2.1365657007467114|
| stddev|     null|0.4987771539074011|1.3142421678231184|              null|5237.431724497624| 2.667889495100226|
|    min|id0000001|                 1|                 0|                 N|                1|               0.0|
|    max|id4000000|                 2|                 9|                 Y|          3526282|              9.99|
+-------+---------+------------------+------------------+------------------+-----------------+------------------+
train = train.withColumn('store_and_fwd_flag', F.when(train.store_and_fwd_flag == 'N', 0).otherwise(1))
train.take(1)
[Row(id='id2875421', vendor_id=2, pickup_datetime=datetime.datetime(2016, 3, 14, 17, 24, 55), dropoff_datetime=datetime.datetime(2016, 3, 14, 17, 32, 30), passenger_count=1, store_and_fwd_flag=0, trip_duration=455, DISTANCE='1.5')]
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

split_col = pyspark.sql.functions.split(train['pickup_datetime'], ' ')
train = train.withColumn('Date', split_col.getItem(0))
train = train.withColumn('Time', split_col.getItem(1))
split_date=pyspark.sql.functions.split(train['Date'], '-')     
train= train.withColumn('Year', split_date.getItem(0))
train= train.withColumn('Month', split_date.getItem(1))
train= train.withColumn('Day', split_date.getItem(2))
funcWeekDay =  udf(lambda x: datetime.strptime(x, '%Y-%m-%d').strftime('%w'))
train = train.withColumn('weekDay', funcWeekDay(col('Date')))
split_date=pyspark.sql.functions.split(train['Time'], ':')     
train= train.withColumn('hour', split_date.getItem(0))
train= train.withColumn('minutes', split_date.getItem(1))
train= train.withColumn('seconds', split_date.getItem(2))
train.take(1)

[Row(id='id2875421', vendor_id=2, pickup_datetime=datetime.datetime(2016, 3, 14, 17, 24, 55), dropoff_datetime=datetime.datetime(2016, 3, 14, 17, 32, 30), passenger_count=1, store_and_fwd_flag=0, trip_duration=455, DISTANCE='1.5', Date='2016-03-14', Time='17:24:55', Year='2016', Month='03', Day='14', weekDay='1', hour='17', minutes='24', seconds='55')]
columns_to_drop = ['pickup_datetime','dropoff_datetime', 'Date','Time']
train = train.drop(*columns_to_drop)
+---------+---------+---------------+------------------+-------------+--------+----+-----+---+-------+----+-------+-------+
|       id|vendor_id|passenger_count|store_and_fwd_flag|trip_duration|DISTANCE|Year|Month|Day|weekDay|hour|minutes|seconds|
+---------+---------+---------------+------------------+-------------+--------+----+-----+---+-------+----+-------+-------+
|id2875421|        2|              1|                 0|          455|       0|2016|    3| 14|      1|  17|     24|     55|
+---------+---------+---------------+------------------+-------------+--------+----+-----+---+-------+----+-------+-------+
only showing top 1 row

The target data (Duration) is seen to be left skewed hence we use logarithmic transformation to get a normal distribution.

from pyspark.sql.functions import log
train = train.withColumn("log_duration", log(train["trip_duration"]) )
train.show(1)
+---------+---------+---------------+------------------+-------------+--------+----+-----+---+-------+----+-------+-------+----------------+
|       id|vendor_id|passenger_count|store_and_fwd_flag|trip_duration|DISTANCE|Year|Month|Day|weekDay|hour|minutes|seconds|    log_duration|
+---------+---------+---------------+------------------+-------------+--------+----+-----+---+-------+----+-------+-------+----------------+
|id2875421|        2|              1|                 0|          455|       1|2016|    3| 14|      1|  17|     24|     55|6.12029741895095|
+---------+---------+---------------+------------------+-------------+--------+----+-----+---+-------+----+-------+-------+----------------+
only showing top 1 row
train.cache()
train.printSchema()
root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- store_and_fwd_flag: integer (nullable = false)
 |-- trip_duration: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- weekDay: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- seconds: integer (nullable = true)
 |-- log_duration: double (nullable = true)
train.describe().toPandas()

summary id vendor_id passenger_count store_and_fwd_flag trip_duration DISTANCE Year Month Day weekDay hour minutes seconds
0 count 1458644 1458644 1458644 1458644 1458644 1458644 1458644 1458644 1458644 1458644 1458644 1458644 1458644
1 mean None 1.5349502688798637 1.6645295219395548 1.0 959.4922729603659 3.4408639325291177 2016.0 3.516817674497684 15.504018115455176 3.1128177951576945 13.60648451575573 29.59015770811795 29.473590540255195
2 stddev None 0.4987771539074011 1.3142421678231184 0.0 5237.431724497624 4.296542880941734 0.0 1.6810375087348843 8.703135115281617 1.9928049699468888 6.399692034352387 17.324714120895614 17.319851679258015
3 min id0000001 1 0 1 1 0.0 2016 1 1 0 0 0 0
4 max id4000000 2 9 1 3526282 97.59 2016 6 31 6 23 59 59
vectorAssembler = VectorAssembler(inputCols = ['vendor_id', 'passenger_count', 'store_and_fwd_flag','DISTANCE', 'Year', 'Month','Day','weekDay','hour','minutes','seconds'], outputCol = 'features')
vtrain = vectorAssembler.transform(train)
vtrain = vtrain.select(['features', 'log_duration'])
vtrain.show(3)
+--------------------+------------------+
|            features|      log_duration|
+--------------------+------------------+
|[2.0,1.0,0.0,1.0,...|  6.12029741895095|
|[1.0,1.0,0.0,1.0,...|6.4967749901858625|
|[2.0,1.0,0.0,6.0,...|  7.66105638236183|
+--------------------+------------------+
only showing top 3 rows
splits = vtrain.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'log_duration',maxDepth = 2)
#dt_model = dt.fit(train_df)
#dt_predictions = dt_model.transform(train_df)
#DRt
dt_evaluator = RegressionEvaluator(labelCol="log_duration", predictionCol="prediction", metricName="rmse")
#rmse = dt_evaluator.evaluate(dt_predictions)
#print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Parameter tuning in Pyspark

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

dtparamGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 3,5])
             .addGrid(dt.maxBins, [4, 8,32])
             .build())

dtcv = CrossValidator(estimator = dt,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = dt_evaluator,
                      numFolds = 10)

# Run cross validations
dtcvModel = dtcv.fit(train_df)
print(dtcvModel)
dtpredictions = dtcvModel.transform(test_df)
mae = dt_evaluator.evaluate(dtpredictions)
print(" Mean Absolute Error (MAE) on test data = %g" % mae)
 Mean Absolute Error (MAE) on test data = 0.340612
rmse = dt_evaluator.evaluate(dtpredictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
Root Mean Squared Error (RMSE) on test data = 0.513194
dtpredictions_ert = dtcvModel.transform(train_df)

dtpredictions_ert.show(2)
[Row(features=SparseVector(11, {0: 1.0, 1: 2.0, 4: 2016.0, 5: 2.0, 6: 14.0, 9: 17.0}), log_duration=5.3230099791384085, prediction=5.341284741186751),
 Row(features=SparseVector(11, {0: 2.0, 1: 1.0, 4: 2016.0, 5: 5.0, 6: 15.0, 9: 15.0}), log_duration=5.814130531825066, prediction=5.341284741186751)]
dtpredictions.show(2)
[Row(features=SparseVector(11, {0: 2.0, 1: 1.0, 4: 2016.0, 5: 2.0, 6: 28.0, 9: 23.0}), log_duration=5.493061443340548, prediction=5.711890426639655),
 Row(features=SparseVector(11, {0: 2.0, 1: 2.0, 4: 2016.0, 5: 2.0, 6: 7.0, 9: 55.0}), log_duration=5.54907608489522, prediction=5.711890426639655)]
pred_table = dtpredictions.groupBy('prediction').count()
pred_table = pred_table.drop('count')
pred_table.show()
+------------------+
|        prediction|
+------------------+
| 6.284032803395688|
|6.0485785145505995|
| 5.946427673262689|
| 6.141958153601273|
| 6.551517246541201|
|7.1425228143753525|
| 7.545394329958232|
| 7.298411167160363|
|5.7496543843581795|
| 7.924025245291648|
|6.7242448681574745|
+------------------+
only showing top 20 rows
pred_table1 = dtpredictions_ert.groupBy('prediction').count()
pred_table1.show()
predlist = pred_table.select("prediction").collect()
predlist = pred_table.select("prediction").rdd.flatMap(lambda x: x).collect()
predlist[0]
6.284032803395688
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

lr = LinearRegression(featuresCol ='features', labelCol = 'log_duration',maxIter=10)
#lr_model = lr.fit(newdf)
evaluator = RegressionEvaluator(labelCol="log_duration", predictionCol="prediction", metricName="rmse")


paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01,0.3,0.5,1]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.1,0.2, 0.5, 0.8,1.0])\
    .build()

tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)


for i in predlist:
    newdf = dtpredictions.filter(dtpredictions.prediction == i)
    newdft = dtpredictions_ert.filter(dtpredictions_ert.prediction == i)
    newdf = newdf.drop('prediction')
    newdft = newdft.drop('prediction')
    model = tvs.fit(newdft)
    lr_predictions = model.transform(newdf)
    rmse = dt_evaluator.evaluate(lr_predictions)
    print("mae on test data = %g" % mae)

Average MAE = 0.32