Playing with Apache Beam on Azure — Part 1: A Basic Example

A basic exploration of Apache beam with Azure, using it to connect to storage and transform and move data into Cosmos DB.

Something I’ve been working on recently is using Apache Beam in Azure to move and transform data. The documentation on this is sparse to the point of being non-existent so hopefully I’ll be able to fill some gaps with what I discovered successfully implementing a small Apache Beam pipeline running on Azure, moving data from Azure Storage to Cosmos DB (Mongo DB API).

Apache Beam

First off, what is Apache Beam? Well basically it’s a framework for developing data processing jobs for streaming or batch data, which can then run on a variety of underlying engines or “runners”. So using one of the Beam SDKs you can develop a job to take, say, JSON messages from some source, do some transformations on them and push them to a downstream sink, and then you have the option to run those jobs on a number of distributed processing engines, for instance:

  • Apache Spark
  • Apache Flink
  • Apache Samza
  • Google Cloud Dataflow

There is also a Direct Runner which runs things standalone on a single machine, and is typically used for development and testing etc. There’s plenty more information on Apache Beam here: Learn about Beam (apache.org)

In this example I’ll use the Direct Runner to validate we can connect to things in Azure and do stuff with that data. In a later post I’ll talk about setting up Beam to run on other engines like Flink on AKS (maybe).

Connectors and Azure

Which does bring me to the subject of connectors. As with any data processing tooling, the key question is — will it be able to connect to my data sources? If you can’t fetch your data or put it where you want it, this limits the value of any tool. The list of “default” connectors for Apache Beam is here: Built-in I/O Transforms (apache.org)

As you can see — the default list has a lot of gaps where Azure is concerned. If you look at the list of supported filesystem interfaces for instance there’s support for AWS and GCP storage but not Azure storage.

Apache Beam Java SDK Default Filesystem Interfaces

However that’s not the end of the story, as there exist SDK extensions to allow reading of Azure Storage accounts, they’re just not very well documented.

Similarly on the Cosmos DB end there is no connector currently for the Core API, but in my example I was using the Cosmos DB Mongo API, which worked perfectly using the Beam Mongo DB connector.

Let’s Just Get On With The Example

First off, I’m going to assume you’re familiar with a few things to follow this example:

  • Java programming
  • The Maven build automation tool
  • Navigating around the Azure portal and basic management of Azure Storage and Cosmos DB accounts.

In this simple example I’m going to create a pipeline using the Java SDK for Apache Beam that:

  1. Creates a connection to an Azure blob storage account
  2. Reads a sample.csv file from that account
  3. Converts each row in the CSV file to a JSON document
  4. Writes the JSON documents to Cosmos DB using the Mongo API

I’m not going to show you how to create the storage / Cosmos accounts, or upload CSV files to the storage account, there’s plenty of documentation covering that sort of thing.

For now we’ll use Apache Beam’s Direct Runner, which essentially means it will run stand alone on your dev machine. Where Azure is concerned you can also tell Beam to run this on Databricks or HD Insight using the Spark Runner, or Apache Flink on AKS using the Flink runner. I’ve had this running on Flink on AKS too, and how I did that I may cover in an additional post.

Before we start in on the code, let’s get the dependencies out of the way. I used Maven for this example, and we need to add dependencies for

  • The Beam Java SDK
  • The Beam Direct Runner for Java (the “runtime” for Beam)
  • The Beam Java SDK for Azure IO
  • The Beam Java SDK for Mongo (to write to Cosmos with)

Here’s a snippet from the pom.xml:

<dependencies>
<!-- Adds a dependency on the Beam SDK. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.27.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.27.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-azure</artifactId>
<version>2.27.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-mongodb</artifactId>
<version>2.27.0</version>
</dependency>
<!-- other dependencies here... --></dependencies>

At the time of writing I was using Beam version 2.27.0. There are probably later versions available now. Probably you should define the Beam version as a Maven variable so you only have to change it in one place, if you care about that sort of thing (Please Note: You will not get good programming practice advice from this blog post, or indeed any of my posts).

Now, here’s an overview of the main class that runs the example. As you can see it’s very basic:

All the activity revolves around the “Pipeline” artefact. The “pipeline” in Apache Beam is the backbone of the process — it defines the series of transformations that you want to carry out on your data.

So the first thing we do is create a PipelineOptions object (actually it’s the second thing. The first thing is needlessly printing “Simple Beam on Azure Example”). This object, as the name suggests, is where we set all the options for the pipeline we’re going to create. Here is where we set up the parameters for connecting to Azure Storage, which is different from the way it works with some of the “native” connectors that Apache Beam has (as we’ll see).

Then to set up the connection to Azure we transform the options object into a BlobstoreOptions object in order to set the connection string parameter on it:

PipelineOptions options = PipelineOptionsFactory.create();options.as(BlobstoreOptions.class).setAzureConnectionString("<BLOB STORE CONNECTION STRING>");

The connection string is the usual Azure storage connection string that you can obtain by looking at the Access Key settings for the storage account in the Azure Portal.

Connection Strings for Storage Account

An interesting thing to note is that you can use the options.as(<CLASS>) method more than once on the same pipeline options object, applying different kinds of options types to combine multiple different options together.

Once those options are set, it’s time to create our main Pipeline object, passing the options to it as we do so, like this:

Pipeline p = Pipeline.create(options);

Then it’s time to use function chaining to define the series of transformations that the pipeline is going to carry out:

p.apply(TextIO.read().from("azfs://ACCOUNT/CONTAINER/FOLDER/file.csv"))
.apply(MapElements.via(new ConvertStringToDocument()))
.apply(MongoDbIO.write()
.withUri("mongodb://<MONGO API CONNECTION STRING>")
.withDatabase("beamtest")
.withCollection("people"));

This sequence of actions does the following things:

  1. First we use the TextIO.read() method to read our sample CSV file (using the azfs:// path syntax) from the blob storage account we set up in the pipeline options. The TextIO (apache.org) class encapsulates a bunch of basic IO transforms that relate to text files, reading/writing etc. Note that the file path has to include the storage account name and container name. Note Also that we don’t have to specifically tell it to open the connection. It does that automagically.
  2. Next we use the MapElements.via(..) call to do a simple transformation from the CSV row string to the JSON format we want. Essentially I pass into the call a function (ConvertStringToDocument) which implements a certain interface, and this function when it’s passed a String containing a row from the CSV file, it spits out a JSON document in the format we want. I’ll show the code for this further down, but you can see examples of this approach in the MapElements (apache.org) documentation.
  3. Now that we have JSON documents in the pipeline, it’s time to write them to Cosmos. At the time of writing there isn’t a Core (SQL) API connector for Cosmos in Apache Beam. So I cheat a little here and use the Mongo connector to write to Cosmos’ Mongo API. The MongoDbIO class is one of the out of the box connectors and you can see how we build the connection object, passing the connection string, and details of the database and collection. The connection string can be obtained from the Cosmos account details in the Azure Portal, in a similar way to the storage connection string above — see Connect a MongoDB application to Azure Cosmos DB | Microsoft Docs.

At this point we haven’t actually processed anything, we’ve just defined what the pipeline will try and do when we run it.

As a little aside, I want to quickly look at the code for the MapElements transformation — a really simple way to take some data thing and turn it into some other data thing.

You need to be mindful of what types your transformations are passing into the pipeline, or expecting to get from the pipeline. For instance here the TextIO.read() from blob reads the file line by line and we get strings back as a result. The MongoDbIO.write() method though (see MongoDbIO.Write (apache.org)) expects to see org.bson.Document objects passed to it. If we try and just pass it CSV strings, or even string representations of JSON, it will simply fall over in a heap of Exceptions. Hence the ConvertStringToDocument class I wrote takes a CSV string, and shoves it into a Document object.

import org.apache.beam.sdk.transforms.SimpleFunction;
import org.bson.Document;
public class ConvertStringToDocument extends SimpleFunction<String, Document> { @Override
public Document apply(String input) {
try { String[] elements = input.split(",");
Document document = new Document();
document.put("name", elements[0]);
document.put("gender", elements[1]);
document.put("city", elements[2]);
return document;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

Again, this is a really basic ugly example, but it demonstrates the principle. The function extends SimpleFunction and overrides the apply() method to take my CSV row String, pull out the first three columns and create a basic org.bson.Document document out of them. So in my case I have a simple list of people, gender and location, and this:

kenny,male,glasgow

becomes this:

{
"name" : "kenny",
"gender" : "male",
"city" : "glasgow"
}

In an org.bson.Document object that the Mongo IO write method is happy to deal with.

Finally, all that’s left to do back in the main App class is run the pipeline we’ve defined:

p.run().waitUntilFinish();

This just kicks off the pipeline and then waits in synchronous fashion until it’s finished.

Running the example is just a case of running the main class in App just like you would any other basic “Hello World” type example. All being well you should see after a few seconds your CSV file contents happily written to Cosmos.

If you haven’t and are now instead looking at a big Java stack trace, well… there are plenty of things that could cause issues here. Did you open the Azure Storage firewall to allow your client to connect? Did you remember to put the name of your storage account in the AZFS file path? Really, at this stage you’re on your own.

Some Closing Thoughts

The example above really just shows how to connect to a couple of basic Azure services with Apache Beam, which I wrote up partly for my own benefit because the documentation isn’t there.

For this we only used the Direct Runner, which executed on your local machine doesn’t provide a lot of scale for transformations and of course has to pass data over the internet to the Azure services. The next step would be to run this stuff on Azure itself. You could run this on a VM in Azure still using the Direct Runner, but to get true scale you really want to take advantage of the runners that can distribute and parallelise the transformations — the Apache Spark runner for instance, or the Apache Flink runner.

After I got this running I next went and deployed this on an Apache Flink cluster, running on Azure Kubernetes Service (AKS) which hugely increased the scale of the processing possible. But that’s the subject for another blog post.

Roving NoSQL specialist @Microsoft