PTransform #
Data processing operation that operates on one or more Pcollections.
Every Ptransform:
-
Takes one or more PCollections
-
Applies a processing operation on the PCollections
-
Produces zero or more PCollections
Flow #
A PTransform creates a new PCollection. The input PCollection is left unchanged.
So the flow is something like Pcollection -> PTransform -> Pcollection -> PTransform -> … and so on.
The flow doesn’t have to be linear; it can be arbitrarily complex.
IO can specify where the final write goes, including to an external source.
Transforms #
Map #
Map applies a function to each element of a collection.
Can be implemented with a lambda function for brevity.
This is an extension of the Minimum Viable Chunk
import apache_beam as beam
import re
pipeline = beam.Pipeline()
outputs = (
pipeline
| 'Read in dataset' >> beam.io.ReadFromText('data/*.txt')
| 'Split the line on the tab' >> beam.Map(lambda line: line.split("\t"))
| 'Write out' >> beam.io.WriteToText(...)
| 'Print the file name' >> beam.Map(print)
)
pipeline.run()
The output should look something like:
filename.txt
['string1', 'stringA']
['string2', 'stringB']
...
Filter #
Filter allows us to keep or drop elements that match specified conditions.
This builds on the previous example from Map.
import apache_beam as beam
import re
pipeline = beam.Pipeline()
outputs = (
pipeline
| 'Read in dataset' >> beam.io.ReadFromText('data/*.txt')
| 'Split the line on the tab' >> beam.Map(lambda line: line.split("\t"))
# Filter
| 'Filter elements where there is a match' >> beam.Filter(lambda: line[0] == "string1")
| 'Write out' >> beam.io.WriteToText(...)
| 'Print the file name' >> beam.Map(print)
)
pipeline.run()
FlatMap #
FlatMap as the name implies flattens a bunch of elements. It applies a function to a set of elements and returns a single combined output.
This builds on the previous example from Filter.
import apache_beam as beam
import re
pipeline = beam.Pipeline()
outputs = (
pipeline
| 'Read in dataset' >> beam.io.ReadFromText('data/*.txt')
| 'Split the line on the tab' >> beam.Map(lambda line: line.split("\t"))
| 'Filter elements where there is a match' >> beam.Filter(lambda: line[0] == "string1")
# FlatMap
| 'Find distinct words using regex' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line[1]))
| 'Write out' >> beam.io.WriteToText(...)
| 'Print the file name' >> beam.Map(print)
)
pipeline.run()
CombinePerKey #
CombinePerKey performs a specified aggregation.
This builds on the previous example from FlatMap.
sum #
In this example, we count up distinct words. We’ll use Beam’s built in sum
function. There’s some magic here.
There’s a bit of a trick where we use an extra Map
operator to create a tuple of the word and a number 1. This gives us a tuple with a word to combine on, and a number to add up (feels funky, but it works).
import apache_beam as beam
import re
pipeline = beam.Pipeline()
outputs = (
pipeline
| 'Read in dataset' >> beam.io.ReadFromText('data/*.txt')
| 'Split the line on the tab' >> beam.Map(lambda line: line.split("\t"))
| 'Filter elements where there is a match' >> beam.Filter(lambda: line[0] == "string1")
| 'Find distinct words using regex' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line[1]))
# Add a tuple
| 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
# perform aggregation
| 'Count (sum up) occurrences of individaul words' >> beam.CombinePerKey(sum)
| 'Write out' >> beam.io.WriteToText(...)
| 'Print the file name' >> beam.Map(print)
)
pipeline.run()