Preface

Many data pipeline frameworks offer very similar functionality. With this in mind, Google developed a unified data pipeline framework under the name Cloud Dataflow SDK. This framework was later donated to the Apache Software Foundation. It was then named Apache Beam. Let’s look at the following figure to understand Apache Beam better.

Source: https://cloud.google.com/blog/products/gcp/dataflow-and-open-source-proposal-to-join-the-apache-incubator

Source: https://cloud.google.com/blog/products/gcp/dataflow-and-open-source-proposal-to-join-the-apache-incubator

We create a single pipeline, which then allows us to do either Batch Processing or Stream Processing. The processing is realized with a Runtime of our choice. We can test our code locally with Direct Pipeline or send it to a cloud service such as Google Cloud Dataflow. This makes Apache Beam a very interesting framework to try.

In this article, we will analyze a COVID dataset with Apache Beam.

The Dataset

The dataset is in the german language, but don’t worry, I will explain the relevant parts. The dataset is provided by the Robert-Koch-Institut here. The dataset looks as follows.

FID,IdBundesland,Bundesland,Landkreis,Altersgruppe,Geschlecht,AnzahlFall,AnzahlTodesfall,Meldedatum,IdLandkreis,Datenstand,NeuerFall,NeuerTodesfall,Refdatum,NeuGenesen,AnzahlGenesen,IstErkrankungsbeginn,Altersgruppe2
1,1,Schleswig-Holstein,SK Kiel,A15-A34,M,1,0,2020/03/23 00:00:00,01002,"15.12.2020, 00:00 Uhr",0,-9,2020/03/20 00:00:00,0,1,1,Nicht übermittelt
2,1,Schleswig-Holstein,SK Flensburg,A00-A04,M,1,0,2020/09/30 00:00:00,01001,"15.12.2020, 00:00 Uhr",0,-9,2020/09/30 00:00:00,0,1,0,Nicht übermittelt
3,1,Schleswig-Holstein,SK Kiel,A15-A34,M,1,0,2020/03/23 00:00:00,01002,"15.12.2020, 00:00 Uhr",0,-9,2020/03/22 00:00:00,0,1,1,Nicht übermittelt
4,1,Schleswig-Holstein,SK Flensburg,A00-A04,M,1,0,2020/10/29 00:00:00,01001,"15.12.2020, 00:00 Uhr",0,-9,2020/10/29 00:00:00,0,1,0,Nicht übermittelt
5,1,Schleswig-Holstein,SK Kiel,A15-A34,M,1,0,2020/03/25 00:00:00,01002,"15.12.2020, 00:00 Uhr",0,-9,2020/03/16 00:00:00,0,1,1,Nicht übermittelt

Every day the RKI publishes a new version of this file. Each row represents a case. A case may be an infection, a death, or a recovery. There is an indicator, that is either 0, 1, or -1 for each of these cases. A 0 indicates, that the case is both in today’s publication and in yesterday’s publication. A 1 indicates, that a case is only in today’s publication. A -1 indicates, that a case is only in yesterday’s publication. So to get the total amount of infections for a day, you have to filter for (0, 1). If you are interested in the delta to the day prior, you filter for (-1, 1).

One more important detail is that a row may contain multiple infections. This is described by “AnzahlFall”. We have to consider this later on. Other fields specify county, age group, sex, and date. These are the relevant fields for this article.

Code Structure

.
├── build.gradle
├── gradlew
├── pipeline_results
│  ├── cases_by_county.csv
│  ├── cases_by_sex.csv
│  ├── deaths_by_age_group.csv
│  ├── deaths_per_day.csv
│  ├── deaths_under_80_by_county.csv
│  ├── recovered_by_age_group.csv
│  └── timespan_between_illness_and_reporting_date_by_county.csv
├── settings.gradle
└── src
   └── main
      ├── java
      │  └── hutan
      │     ├── EntryPoint.java
      │     └── tasks
      │        ├── FindTheTenDaysWithMostDeaths.java
      │        ├── FindTopThreeTimeDifferenceMeansBetweenIllnessStartAndReportDateByCounty.java
      │        ├── SumCasesByCounty.java
      │        ├── SumCasesBySex.java
      │        ├── SumDeathsOfPersonsUnderAgeOf80ByCounty.java
      │        ├── SumRecoveredByAgeGroupAndDay.java
      └── resources
         └── logback.xml

We use gradle as our build system and any relevant Java code is inside /src. We use Entrypoint.java to wrap our tasks. These tasks are our pipelines and they are located in the tasks package.

Dependencies

Let’s look at our Gradle dependencies.

dependencies {

    // Apache Beam
    implementation 'org.apache.beam:beam-sdks-java-core:2.39.0'
    implementation 'org.apache.beam:beam-runners-direct-java:2.39.0'

    // Spark
//    implementation 'org.apache.beam:beam-runners-spark-3:2.40.0'
//    implementation 'org.apache.spark:spark-core_2.12:3.3.0'

    // Logging
    implementation 'org.slf4j:slf4j-api:1.7.36'
    implementation 'ch.qos.logback:logback-classic:1.2.11'

    // Lombok
    compileOnly 'org.projectlombok:lombok:1.18.24'
    testCompileOnly 'org.projectlombok:lombok:1.18.24'
    annotationProcessor 'org.projectlombok:lombok:1.18.24'
    testAnnotationProcessor 'org.projectlombok:lombok:1.18.24'

    // Others
    compile 'joda-time:joda-time:2.10.14'
}

We need Apache Beam itself of course. Apart from the core of Apache Beam, we also need the Direct Runner of it. Instead of this Direct Runner, we could have worked with Spark instead, which is currently commented out. Setting up a Spark cluster is not within the scope of this article, we might do this another time. Here we will only work with the Direct Runner. Apart from that, we import some other utilities.

Next, let’s look at Entrypoint.java.

Entrypoint

public class EntryPoint {

    public interface Options extends PipelineOptions {

        @Description("Input for the pipeline")
        @Validation.Required
        String getInput();

        void setInput(String input);
    }

    public static void main(String... args) {
        // Parse and create pipeline options
        PipelineOptionsFactory.register(Options.class);
        var options = PipelineOptionsFactory.fromArgs(args)
                .withValidation()
                .as(Options.class);

        // Create a new Apache Beam pipeline
        var pipeline = Pipeline.create(options);

        // Read input file
        var input = pipeline.apply(
                "Read all lines from input file",
                TextIO.read().from(options.getInput()));


        // Submit Pipelines
        SumCasesByCounty.calculate(input);
        SumCasesBySex.calculate(input);
        FindTheTenDaysWithMostDeaths.calculate(input);
        SumRecoveredByAgeGroupAndDay.calculate(input);
        FindTopThreeTimeDifferenceMeansBetweenIllnessStartAndReportDateByCounty.calculate(input);
        SumDeathsOfPersonsUnderAgeOf80ByCounty.calculate(input);

        // Run Pipelines
        var start = DateTime.now();
        pipeline.run().waitUntilFinish();
        var end = DateTime.now();

        var period = new Period(start, end);
        System.out.println(period);

    }
}

First, we need to parse our configuration. The only required argument for that is the location of our dataset. We expect CSV data without a header row here. Apache Beam does not support skipping the header row with the Java SDK currently, there is a ticket on GitHub. We create input based on the dataset. Each of our pipelines will work with that input. We can pass the same object to all Pipelines, Apache Beam won’t interfere with the original object. Once we have submitted our Pipelines, we use pipeline.run().waitUntilFinish() to block until all pipelines are finished. Additionally, we measure the time it takes here. On my home computer, it took about 30 seconds to complete all Pipelines for the 2020 dataset, which is about 100MB big.

Next, let’s look at SumCasesByCounty.java as an example for our Pipelines.

Data Pipeline Example - Cases By County

public class SumCasesByCounty {

    public static PDone calculate(PCollection<String> input) {

        return input
                .apply("KV(12:neuerFall, KV(2:bundesland, 6:anzahlFall))",
                        MapElements
                                .into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())))
                                .via(line -> {
                                    var fields = line.split(",");
                                    return KV.of(Integer.parseInt(fields[12]), KV.of(fields[2], Integer.parseInt(fields[6])));
                                }))
                .apply("Remove non new cases",
                        Filter.by(element -> element.getKey() >= 0))
                .apply("Remove neuerFall: KV(12:neuerFall, KV(2:bundesland, 6:anzahlFall) -> KV(2:bundesland, 6:anzahlFall)",
                        MapElements
                                .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
                                .via(element -> KV.of(
                                        element.getValue().getKey(),
                                        element.getValue().getValue())
                                ))
                .apply("Sum the amount of cases",
                        Sum.integersPerKey())
                .apply("Convert key value pairs to strings",
                        MapElements
                                .into(TypeDescriptors.strings())
                                .via(element -> element.getKey() + ";" + element.getValue()))
                .apply("Write to file",
                        TextIO.write().to("pipeline_results/cases_by_county.csv").withoutSharding());
    }
}

As the name implies, we are trying to aggregate the number of cases reported by each county. For that we need these fields:

  • bundesland: The German word for county. We will simply refer to this as county.
  • anzahlFall: How many infections were reported in the given row. We will refer to this as numberCase.
  • neuerFall: Whether this row is part of the current or/and old publication. We discussed this one earlier on. We will refer to this as newCase.

We start with our input and then there is a chain of .apply. Each of those .apply represents a step in our Pipeline.

We have a String at the start of each step, which describes it. Our first step is to take the CSV, which we read line by line and extract the important fields. We do this by splitting our String with ,, which is our CSV separator symbol. We have to be cautious here, because one of the fields may contain a ,. If you are working with for example data, which contains Tweets from Twitter, you have to consider that users might use that symbol in text and change your code based on that. Here, we only use , in one of our fields, which contains a date. With that in mind, we extract the fields newCase, county, and numberCase. We do that in the following format: KV(newCase, KV(county, numberCase)). We choose this format with the general logic of our Pipeline in mind.

Next, we have to filter based on newCase, which we do right away with Filter.by(element -> element.getKey() >= 0).

Then we do not need this value anymore and discard it. We then have the following data: KV(county, numberCase).

We want to aggregate numberCase for each county. We achieve that with Sum.integersPerKey().

Finally, we have to convert our Key-Value Pairs to Strings again, which we then can write to a new CSV file. We do that with MapElements. We have used this method twice before but did not describe it in detail, so let’s do it here. We have to describe the format into which our MapElements method will transform our data with .into(FORMAT). In this case, we transform it into a simple String, which translates to TypeDescriptors.strings(). Next, we have to describe how we are going to transform our data with the .via() method. Here we can use any kind of method, but most of the time a lambda expression will give us clean and simple-looking code. Here our input is KV(county, sum(numberCase)) and we take the key and the value and concatenate them, separating them with a ;.

In the last step, we write this data to a file. We want our data inside a single file, so we use .withoutSharding() here. Without this, we would get a file for each node involved in our Pipeline. So if we had a four-node cluster, we would get four different CSV files.

And that’s already it. This Pipeline might seem simple to you, but it already contains the most important pieces needed for most pipelines. We extract data, filter it and aggregate it. We do this over and over again, till we reach our desired results. These simple functions are very good at being distributed. Each of our steps can be efficiently scaled to a big number of nodes so that we can deal with huge amounts of data.

Next, let’s visualize the data, which our Pipelines created.

Visualizations

This article is mainly about Apache Beam. We still include visualizations here, to get a feeling for the data we created. Our analysis will be very superficial.

Cases By County

Here, we can see the absolute number of cases per county. This is what we directly extracted without example Pipeline. On its own, the absolute numbers do not tell us a lot, since we have to consider the total population of each county too. So, we download the inhabitants per county from here. Then, we can look at the relative number of infections, which is the number of infections divided by the population per county.

So in contrast to the absolute numbers, we see, that the relative numbers are fairly close to each other. There are many variables to consider when evaluating these numbers: How many tests were made? Were tests available for free? How many people are vaccinated? To do a proper analysis, one would need to dig a lot deeper, but that is not the focus of this article, so we will leave it at this.

Recovered by Agegroup

Another Pipeline we created, aggregates the absolute number of recoveries per age group and day. Let’s look at the results.

Here we use the same y scale for each age group. We can see, that in absolute numbers the age groups 15-34 and 35-59 stand out. Just like before, this does not consider the total population for each age group. Instead of analyzing that once again, we will look at the same chart with free y scales.

Here we can see if there is any difference in the shape between the graphs for our age groups. We can see, that in general, spikes occur at the same points in time. However, the spikes are not all equally big. Let’s compare the age groups 15-34 and 80+. Both have a big spike during the mid of 2022 and a small one at the start of 2021. If you compare the height of these spikes, you can see, that the spike at the start of 2021 is relatively high for the age group 80+. This could be for a variety of reasons. Perhaps that was the time when people did meet up with their families over Christmas and many infections occurred. The younger people perhaps did not test themselves as much as the elderly people afterwards. Once again, a proper analysis requires more time and data.

Cases by Sex

Another Pipeline aggregates the cases by sex. We represent that in a simple table.

SexInfections
Male14.184.314
Female15.228.545
Unknown280.130

We see a slightly higher number of cases for females. This could also be due to the fact, that there are more females in Germany than males. The ratio is 50.54% to 49.46% in favor of females. The ratio of cases disregarding unknown ones is 51.77% in favor of females.

Days with most Deaths

The last Pipeline we will look at aggregates the deaths per day and delivers us the 10 days with the most deaths.

DateDeaths
2020/12/301289
2020/12/231253
2020/12/291249
2020/12/221199
2020/12/171167
2020/12/101078
2020/12/161070
2020/12/181047
2021/01/051041
2021/01/071030

Most deaths occurred during December of 2020. We see two outliers in May and July of 2021.

Other Pipelines

We had two more Pipelines, which unfortunately were too demanding to run locally with the full dataset. Our first Pipeline determines the difference in infection start and reporting date per county. The second aggregates deaths of persons under the age of 80 per county. Apache Beam is not made for local running, you are only supposed to test it locally on a small dataset. We will omit those here.

Conclusion

Traditionally, we had different systems for different use-cases. Spark for batch-processing. Flink for stream processing. Kafka as a messagebroker. Now, these systems are extended and we can for example do stream-processing in Spark or Kafka. Apache Beam tries to unify these Pipelines since they all operate on a very similar set of operations. While Apache Beam is still relatively new and lacks some features such as removing the header row from a CSV file, it makes distributed Pipelines easy. You can choose from a variety of runners depending on your needs. Check out the available runners and the implemented functionalities here.

Apache Beam is the first framework I have worked with to create distributed data pipelines. The documentation is good and everything is intuitive. I enjoyed working with Apache Beam.