39 - Windowing(Python)

Loading...

Lesson 39 - Windowing

import time
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from pyspark.sql.functions import col, expr, window

Event Time and Processing Time

For many types of streaming data, there are two relevant times associated with each record received over the stream. These are the record's event time and its processing time.

  • The event time for a record refers to the time when the record was generated. For example, this might be the time when a user request was submitted, an order was placed, a financial transaction was initiated, or when a sensor reading was generated. The event time is not relevant for every type of data, but when it is needed, it should be included as part of the record itself.
  • The processing time for a record is the time when the stream-processing system actually receives the data. We are often less interested in the processing time than in the event time.

Windowing

There are often occasions when we wish to group records received over a stream into intervals of time based on their event time. Such intervals are referred to as windows. Structured Streaming makes it very easy to automatically define event time windows by providing the window() function, which can be imported from pyspark.sql.functions. This function accepts two arguments. The first should be the name of the column that contains the event time data. The second argument should be a string representing the desired window length. Examples of valid strings include: '1 day', '1 hour', '1 minute', and '15 minutes'.

Retail Data

To demonstrate the use of windowing, we will return to the retail dataset from the previous lesson. Recall that this dataset is stored in several CSV files located in the directory /FileStore/tables/retail/. Each file contains information about a single day of purchases. To get a sense as to what our data looks like, and to determine the schema we should use, we will first read the contents of a single file into a static DataFrame.

static_df = (
    spark.read
    .option('header', True)
    .option('inferSchema', True)
    .csv('/FileStore/tables/retail/2010_12_01.csv')
)
 
print(static_df.count())
 
retail_schema = static_df.schema
 
static_df.show(5, truncate=False)
3108 +---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+ |InvoiceNo|StockCode|Description |Quantity|InvoiceDate |UnitPrice|CustomerID|Country | +---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+ |536365 |85123A |WHITE HANGING HEART T-LIGHT HOLDER |6 |2010-12-01 08:26:00|2.55 |17850.0 |United Kingdom| |536365 |71053 |WHITE METAL LANTERN |6 |2010-12-01 08:26:00|3.39 |17850.0 |United Kingdom| |536365 |84406B |CREAM CUPID HEARTS COAT HANGER |8 |2010-12-01 08:26:00|2.75 |17850.0 |United Kingdom| |536365 |84029G |KNITTED UNION FLAG HOT WATER BOTTLE|6 |2010-12-01 08:26:00|3.39 |17850.0 |United Kingdom| |536365 |84029E |RED WOOLLY HOTTIE WHITE HEART. |6 |2010-12-01 08:26:00|3.39 |17850.0 |United Kingdom| +---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+ only showing top 5 rows

In the cell below, we create a streaming DataFrame that will read files from the /FileStore/tables/retail/ one at a time.

retail_stream = (
    spark.readStream
    .option('header', True)
    .option('maxFilesPerTrigger', 1)
    .schema(retail_schema)
    .csv('/FileStore/tables/retail/')
)
 
print(retail_stream.isStreaming)
True

We will now use the window() function to group records into buckets representing one day of purchases.

windowed_df = (
    retail_stream
    .withColumn('total_cost', expr('UnitPrice * Quantity'))
    .groupBy(
        window('InvoiceDate', '1 day')
    )
    .agg(
        expr('COUNT(*) AS num_purchases'),
        expr('ROUND(SUM(total_cost),2) AS total_spent'),
        expr('ROUND(MEAN(total_cost),2) AS avg_spent')
    )
    .sort('window')
)
 
print(type(windowed_df))
print(windowed_df.isStreaming)
<class 'pyspark.sql.dataframe.DataFrame'> True

We will now start the stream. The results will be written to an in-memory sink.

writer = (
    windowed_df
    .writeStream
    .format('memory')
    .queryName('windowed')
    .outputMode('complete')
)
 
query = writer.start()
windowed(id: 867e84b4-8cdb-4eff-bb93-6986bc39ec33)
Last updated: 1450 days ago
print(query.isActive)
True
print(spark.sql('SELECT * from windowed').count())
spark.sql('SELECT * from windowed ORDER BY window DESC').show(10, truncate=False)
12 +------------------------------------------+-------------+-----------+---------+ |window |num_purchases|total_spent|avg_spent| +------------------------------------------+-------------+-----------+---------+ |{2011-01-09 00:00:00, 2011-01-10 00:00:00}|1117 |15710.8 |14.07 | |{2011-01-07 00:00:00, 2011-01-08 00:00:00}|1794 |27233.14 |15.18 | |{2011-01-06 00:00:00, 2011-01-07 00:00:00}|1832 |37392.74 |20.41 | |{2011-01-05 00:00:00, 2011-01-06 00:00:00}|1743 |-1566.23 |-0.9 | |{2011-01-04 00:00:00, 2011-01-05 00:00:00}|1184 |14950.48 |12.63 | |{2010-12-23 00:00:00, 2010-12-24 00:00:00}|963 |11796.31 |12.25 | |{2010-12-22 00:00:00, 2010-12-23 00:00:00}|291 |6134.57 |21.08 | |{2010-12-21 00:00:00, 2010-12-22 00:00:00}|1586 |47097.94 |29.7 | |{2010-12-20 00:00:00, 2010-12-21 00:00:00}|1763 |24741.75 |14.03 | |{2010-12-19 00:00:00, 2010-12-20 00:00:00}|522 |7517.31 |14.4 | +------------------------------------------+-------------+-----------+---------+ only showing top 10 rows
temp_pdf = spark.sql('SELECT * from windowed').toPandas()
rng = range(len(temp_pdf.total_spent))
dates = temp_pdf.window.map(lambda x : x['start'])
 
plt.figure(figsize=[8,4])
plt.plot(dates, temp_pdf.total_spent)
plt.gca().xaxis.set_major_locator(mdates.DayLocator())
#plt.gca().xaxis.set_major_locator(mdates.WeekdayLocator())
 
plt.xticks(rotation=90)
plt.show()