PTransform

PTransform #

Data processing operation that operates on one or more Pcollections.

Every Ptransform:

  • Takes one or more PCollections

  • Applies a processing operation on the PCollections

  • Produces zero or more PCollections

Flow #

A PTransform creates a new PCollection. The input PCollection is left unchanged.

So the flow is something like Pcollection -> PTransform -> Pcollection -> PTransform -> … and so on.

The flow doesn’t have to be linear; it can be arbitrarily complex.

IO can specify where the final write goes, including to an external source.

Transforms #

Map #

Map applies a function to each element of a collection.

Can be implemented with a lambda function for brevity.

This is an extension of the Minimum Viable Chunk

import apache_beam as beam
import re

pipeline = beam.Pipeline()

outputs = (
    pipeline
    | 'Read in dataset' >> beam.io.ReadFromText('data/*.txt') 
    | 'Split the line on the tab' >> beam.Map(lambda line: line.split("\t"))
    | 'Write out' >> beam.io.WriteToText(...) 
    | 'Print the file name' >> beam.Map(print)
)

pipeline.run()

The output should look something like:

filename.txt
['string1', 'stringA']
['string2', 'stringB']
...

Filter #

Filter allows us to keep or drop elements that match specified conditions.

This builds on the previous example from Map.

import apache_beam as beam
import re

pipeline = beam.Pipeline()

outputs = (
    pipeline
    | 'Read in dataset' >> beam.io.ReadFromText('data/*.txt') 
    | 'Split the line on the tab' >> beam.Map(lambda line: line.split("\t"))
    # Filter
    | 'Filter elements where there is a match' >> beam.Filter(lambda: line[0] == "string1")
    | 'Write out' >> beam.io.WriteToText(...) 
    | 'Print the file name' >> beam.Map(print)
)

pipeline.run()

FlatMap #

FlatMap as the name implies flattens a bunch of elements. It applies a function to a set of elements and returns a single combined output.

This builds on the previous example from Filter.

import apache_beam as beam
import re

pipeline = beam.Pipeline()

outputs = (
    pipeline
    | 'Read in dataset' >> beam.io.ReadFromText('data/*.txt') 
    | 'Split the line on the tab' >> beam.Map(lambda line: line.split("\t"))
    | 'Filter elements where there is a match' >> beam.Filter(lambda: line[0] == "string1")
    # FlatMap
    | 'Find distinct words using regex' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line[1]))
    | 'Write out' >> beam.io.WriteToText(...) 
    | 'Print the file name' >> beam.Map(print)
)

pipeline.run()

CombinePerKey #

CombinePerKey performs a specified aggregation.

This builds on the previous example from FlatMap.

sum #

In this example, we count up distinct words. We’ll use Beam’s built in sum function. There’s some magic here.

There’s a bit of a trick where we use an extra Map operator to create a tuple of the word and a number 1. This gives us a tuple with a word to combine on, and a number to add up (feels funky, but it works).

import apache_beam as beam
import re

pipeline = beam.Pipeline()

outputs = (
    pipeline
    | 'Read in dataset' >> beam.io.ReadFromText('data/*.txt') 
    | 'Split the line on the tab' >> beam.Map(lambda line: line.split("\t"))
    | 'Filter elements where there is a match' >> beam.Filter(lambda: line[0] == "string1")
    | 'Find distinct words using regex' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line[1]))
    # Add a tuple
    | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
    # perform aggregation
    | 'Count (sum up) occurrences of individaul words' >> beam.CombinePerKey(sum)
    | 'Write out' >> beam.io.WriteToText(...) 
    | 'Print the file name' >> beam.Map(print)
)

pipeline.run()