static_df = (
spark.read
.option('header', True)
.option('inferSchema', True)
.csv('/FileStore/tables/retail/2010_12_01.csv')
)
print(static_df.count())
retail_schema = static_df.schema
static_df.show(5, truncate=False)
3108
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description |Quantity|InvoiceDate |UnitPrice|CustomerID|Country |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|536365 |85123A |WHITE HANGING HEART T-LIGHT HOLDER |6 |2010-12-01 08:26:00|2.55 |17850.0 |United Kingdom|
|536365 |71053 |WHITE METAL LANTERN |6 |2010-12-01 08:26:00|3.39 |17850.0 |United Kingdom|
|536365 |84406B |CREAM CUPID HEARTS COAT HANGER |8 |2010-12-01 08:26:00|2.75 |17850.0 |United Kingdom|
|536365 |84029G |KNITTED UNION FLAG HOT WATER BOTTLE|6 |2010-12-01 08:26:00|3.39 |17850.0 |United Kingdom|
|536365 |84029E |RED WOOLLY HOTTIE WHITE HEART. |6 |2010-12-01 08:26:00|3.39 |17850.0 |United Kingdom|
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows
windowed_df = (
retail_stream
.withColumn('total_cost', expr('UnitPrice * Quantity'))
.groupBy(
window('InvoiceDate', '1 day')
)
.agg(
expr('COUNT(*) AS num_purchases'),
expr('ROUND(SUM(total_cost),2) AS total_spent'),
expr('ROUND(MEAN(total_cost),2) AS avg_spent')
)
.sort('window')
)
print(type(windowed_df))
print(windowed_df.isStreaming)
<class 'pyspark.sql.dataframe.DataFrame'>
True
print(spark.sql('SELECT * from windowed').count())
spark.sql('SELECT * from windowed ORDER BY window DESC').show(10, truncate=False)
12
+------------------------------------------+-------------+-----------+---------+
|window |num_purchases|total_spent|avg_spent|
+------------------------------------------+-------------+-----------+---------+
|{2011-01-09 00:00:00, 2011-01-10 00:00:00}|1117 |15710.8 |14.07 |
|{2011-01-07 00:00:00, 2011-01-08 00:00:00}|1794 |27233.14 |15.18 |
|{2011-01-06 00:00:00, 2011-01-07 00:00:00}|1832 |37392.74 |20.41 |
|{2011-01-05 00:00:00, 2011-01-06 00:00:00}|1743 |-1566.23 |-0.9 |
|{2011-01-04 00:00:00, 2011-01-05 00:00:00}|1184 |14950.48 |12.63 |
|{2010-12-23 00:00:00, 2010-12-24 00:00:00}|963 |11796.31 |12.25 |
|{2010-12-22 00:00:00, 2010-12-23 00:00:00}|291 |6134.57 |21.08 |
|{2010-12-21 00:00:00, 2010-12-22 00:00:00}|1586 |47097.94 |29.7 |
|{2010-12-20 00:00:00, 2010-12-21 00:00:00}|1763 |24741.75 |14.03 |
|{2010-12-19 00:00:00, 2010-12-20 00:00:00}|522 |7517.31 |14.4 |
+------------------------------------------+-------------+-----------+---------+
only showing top 10 rows
temp_pdf = spark.sql('SELECT * from windowed').toPandas()
rng = range(len(temp_pdf.total_spent))
dates = temp_pdf.window.map(lambda x : x['start'])
plt.figure(figsize=[8,4])
plt.plot(dates, temp_pdf.total_spent)
plt.gca().xaxis.set_major_locator(mdates.DayLocator())
#plt.gca().xaxis.set_major_locator(mdates.WeekdayLocator())
plt.xticks(rotation=90)
plt.show()
Lesson 39 - Windowing