Skip to the content.

ETL Pipeline with Postgres

For this project, we apply data modeling using python. We will create an ETL pipeline to Extract information for all restaurants from Yelp in the area of Berlin. We Transform the data using a pipeline with python and finally we Load the data into an SQL database, cleaned and ready for analysis.

Enter Pipeline

The solution to this problem is a data pipeline. Like a physical pipeline system that drives the flow of any material from the source to the destination, a data pipeline comes to transfer the flow of data from any source to the desired destination. But not only that, it can make all the necessary transformations -while transferring the data- so the data reach their destination in the desired format and ready to be analyzed.

Pipeline

The diagram above shows a simple explanation of what a pipeline is. In its simplest form, it’s just a command that reads some data from a source, and applying or not some transformations drives the data to some source or even to another pipeline.

Project Background

The case scenario of this project is as follows:

We work for a small startup, and they want to develop new services into the food industry. They need an efficient way to get all information about the restaurants in the area of Berlin and run some analysis to see in which category of restaurants the people tend to eat and leave more positive reviews.

The Pipeline for this project will look as follows:

For this project, the source is just an API and the destination would be a Postgres SQL database. So the pipeline for this project would look like this

Pipeline

Each API lets us make some GET/POST requests and sends back to us a payload with the appropriate information depending on what we send it. The logic behind each API is the same but each one uses a unique style of commands that need to be sent with each request to receive the correct response.

Yelp’s API has the following format

 response = requests.get(api_url, params=params, headers=headers)

In the params dictionary, we have to define what do we want to look for, for this case it would be the following

params = {"term": "restaurants",
          "location": "berlin"}

Passing to the key terms: term:restaurants and location:Berlin, we ask the API to look for the term restaurants located in Berlin.

The request is almost ready, it requires more information. Although it’s a public API, someone must be registered to use the API. After registration you will receive a KEY, this key should be passed to the request with the following text into the headers like this

headers = {"Authorization": "Bearer {}".format(api_key)}

Connect to the API

As was said before, to make a request we need some credentials it is wise not to share in public projects. There a lot of ways to hide this from the public. I will demonstrate two of them.

Extract credentials to the environment

This way demands the creation of a bat file with all the commands needed to set a variable to the local environment, after doing that it is easy to retrieve this information in our code without fear of revealing it to the public.

*.bat file

set API_KEY = 'my-private-key'

Run the file

my_bat.bat

Make a file and import it in your project

This way is kind of more pythonic. It takes only the creation of a new file where each variable will have the values of some value needed for our project and has to stay private.

environment_consts.py file

API_KEY = 'api key'

Project file

from environment_consts import API_KEY

Connect to the database

For this project, I choose to work with PostgreSQL and I used the Psycopg python library to connect to and execute queries to my database. We can execute SQL queries either using the pgAdmin tool or using python.

If we want to run out DB from python we need to export into the environment all the parameters used for the a connection such as: the database name, password, host, , etc, but this step is already completed.

To execute a query from python we need to make a connection with the DB, after that, we create a cursor, an object that deals with the execution of each query. In the end, we need to close the connection. For our luck pandas supports the transfer of a data frame to SQL database automatically but only using the SQLAlchemy library. For only this step we have to use this specific library

Data format, and the Transform process

The data we receive from the API are in JSON format and looks like this:

{
  "businesses": [
    {
      "id": "8uBE9ZgfZOM2z_Rllw478Q",
      "alias": "stadtklause-berlin",
      "name": "Stadtklause",
      "image_url": "https://s3-media3.fl.yelpcdn.com/bphoto/vqFLzToYyFfAoYKlBRq0jA/o.jpg",
      "is_closed": false,
      "url": "https://www.yelp.com/biz/stadtklause-berlin?adjust_creative=N9q5J26XY3aaHLZrjJ7g7g&utm_campaign=yelp_api_v3&utm_medium=api_v3_business_search&utm_source=N9q5J26XY3aaHLZrjJ7g7g",
      "review_count": 172,
      "categories": [
        {
          "alias": "german",
          "title": "German"
        }
      ]
    }
  ]
}

So we have to grab the business part and then read each key, value pair and load it into a pandas data frame. The problem now is that the JSON is deep nested. That means we have dictionaries nested into the data which we have to iterate again to receive the key, value pairs. If we read the data as it is it will look like this:

Deep nested JSON

We can see there are columns with dictionaries as entries. That’s what we can fix using a more complex way to read the JSON file.

  json_normalize(data['businesses'], sep="_",
                 record_path=record_path,
                 meta=meta,
                 meta_prefix=meta_prefix,
                 record_prefix=record_prefix

Normalized JSON

More about how this works, in documentation.

Final step

After loading the raw data into the database we can use any DBMS for further analysis or to normalize the raw table

Database in final form after normalization