[ad_1]
Last week we learned about dlt (data load tool) in a workshop conducted by Adrian Brudaru and hosted by Data Talks Club.
dlt can extract data from APIs in a way that’s scalable. It can normalize and load data, and it can do these steps incrementally. It does these tasks quickly and concisely. dlt is an open source library that automates the most tedious parts of data ingestion. It takes care of data loading, schema management, data type detection, self healing, and scalable extraction.
Data loading consists of extracting data from a producer, transporting it to a convenient environment, preparing it for usage by normalizing it, cleaning it, and adding metadata. For example, a .csv file doesn’t fully describe the data. We have to add the metadata. We have to add a schema. A .parquet file, on the other hand, has a schema and could be considered fully loaded.
In this workshop, we learned about extracting, normalizing, and loading data. We learned about incremental data loading, but not incremental data extraction, which is more complex and for another day.
Extracting data
Most data is stored behind an api, either as records of data or as a file, which is cheaper and faster. It can also be another database, which returns records as json.
There are two limitations to the extraction process. The first is hardware limitations: how to manage memory. The second is network limits, which we can’t do much about other than to retry. Finally there are source api limits, like rate limitations.
The problem with memory managment is that you run out of RAM or disk space. If you run out of RAM, that’s very bad. Usually you use an orchestrator that runs several pipelines at once. If you run out of RAM, not only your pipeline crashes, but you crash all the worker processes that run on your machine.
Since we don’t know how much data we’re importing, and since we cannot scale RAM dynamically or infinitely, we must control the maximum memory that we use.
We could stream, or read the data line by line or chunk by chunk. Examples of this include video streaming and GPS map view. Data engineers stream data between buffers, such as from API to a local file, from webhooks to event queues, or from event queues such as Kafka to a bucket. A webhook is a system that sends data to another system as a POST request in real time, depending on some kind of trigger.
Streaming in Python via generators
We use generators, which are functions that return multiple times. The data can be released as it’s produced, allowing us to process it in a stream rather than downloading it all at once first, which could crash by running out of memory.
In the first example, we’re getting cat pictures from Twitter. There could be 10, there could be 10,000,000. We don’t know. We could try to first read all the data into memory. But we may run out of memory. Here’s the wrong way first:
def search_twitter(query):
data = []
for row in paginated_get(query):
data.append(row)
return data
# Collect all the cat picture data
for row in search_twitter("cat pictures"):
# Once collected,
# print row by row
print(row)
Instead, we use a generator, which yields data as it comes in.
def extract_data(query):
for row in paginated_get(query):
yield row
# Get one row at a time
for row in extract_data("cat pictures"):
# print the row
print(row)
# do something with the row such as cleaning it and writing it to a buffer
# continue requesting and printing data
This was a very simple example, but I needed to understand it to go on. I hadn’t seen the keyword “yield” before, so I asked ChatGPT about it when the workshop was over. It said: “Yes, ‘yield’ is indeed a Python keyword, and it’s used in the context of generator functions. When you use ‘yield’ inside a function, it turns that function into a generator. A generator function is a special type of function in Python that generates a sequence of values one at a time. Instead of using ‘return’ to return a single value and terminate the function’s execution, generator functions use ‘yield’ to produce a series of values over multiple invocations. When a function contains a ‘yield’ statement, calling that function returns a generator object. When you iterate over the generator object using a loop, the function’s execution is paused at the ‘yield’ statement, and the value following ‘yield’ is returned. The function’s state is retained, allowing it to resume execution from where it left off when the next value is requested.”
What this means is that in the “cat pictures” example, the generator function only “yields” as many cat pictures as you have already processed or are processing. It will continue to yield cat pictures, one at a time, until you’re done iterating through them.
Data Extraction
The second example was during the hands-on part of the workshop. First, he shows us how we use a generator to get pages with 1000 records of data. It runs until it has retrieved all the data.
import requests
BASE_API_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"
# I call this a paginated getter
# as it's a function that gets data
# and also paginates until there is no more data
# by yielding pages, we "microbatch", which speeds up downstream processing
def paginated_getter():
page_number = 1
while True:
# Set the query parameters
params = {'page': page_number}
# Make the GET request to the API
response = requests.get(BASE_API_URL, params=params)
response.raise_for_status() # Raise an HTTP Error for bad responses
page_json = response.json()
print(f'got page number {page_number} with {len(page_json)} records')
# if the page has no records, stop iterating
if page_json:
yield page_json
page_number += 1
else:
# No more data, break the loop
break
if __name__ == '__main__':
# Use the generator to iterate over pages
for page_data in paginated_getter():
# Process each page as needed
print(page_data)
You can see in the output that the generator function (paginated_getter) only iterates once before the calling loop prints the output.
got page number 1 with 1000 records
[{'End_Lat': 40.742963, 'End_Lon': -73.980072...
got page number 2 with 1000 records
[{'End_Lat': 40.72205, 'End_Lon': -73.990365...
...
It’s cheaper for an api to give you a file link rather than page data. This is bad, because when we open the file, memory is used, and when we process the data, it’s used again. We have 8 GB in memory if we have 4 GB in the file.
Instead, we can use a stream download. This gives us the best of both worlds. The throughput, or speed of download, is fast, and memory management is easy. The downside is that this is difficult to do for columnar file formats. He gives an example of reading a jsonl file, which is formatted with lines. If we had to read a json file, we could use the ijson library to break it into lines without loading to memory.
import requests
import json
url = "https://storage.googleapis.com/dtc_zoomcamp_api/yellow_tripdata_2009-06.jsonl"
def stream_download_jsonl(url):
response = requests.get(url, stream=True)
response.raise_for_status() # Raise an HTTPError for bad responses
for line in response.iter_lines():
if line:
yield json.loads(line)
# Use the generator to iterate over rows with minimal memory usage
row_counter = 0
for row in stream_download_jsonl(url):
print(row)
row_counter += 1
if row_counter >= 5:
break
Next he shows us how to use dlt to accomplish the extraction and write the results to the DuckDB database.
import dlt
# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
generators_pipeline = dlt.pipeline(destination='duckdb', dataset_name='generators')
# we can load any generator to a table at the pipeline destination as follows:
generators_pipeline.run(paginated_getter(),
table_name="http_download",
write_disposition="replace")
# we can load the next generator to the same or to a different table.
generators_pipeline.run(stream_download_jsonl(url),
table_name="stream_download",
write_disposition="replace")
Here, we use the paginated_getter generator that we defined previously. We are telling dbt to run the generator and put the data into a table in DuckDB. In the second call, we use the stream_download_jsonl generator.
DuckDB is an in-memory database, like SQLite. We create some files of data and DuckDB can read the data. DuckDB is a library that can run almost anywhere. It’s like a data warehouse – an alternative that you can run locally. It’s good for development. You can run on DuckDB and when you’re ready to go to production you write to BigQuery.
Finally, we have a look at the table we created in DuckDB.
import duckdb
conn = duckdb.connect(f"{generators_pipeline.pipeline_name}.duckdb")
# let's see the tables
conn.sql(f"SET search_path = '{generators_pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))
# and the data
print("\n\n\n http_download table below:")
rides = conn.sql("SELECT * FROM http_download").df()
display(rides)
print("\n\n\n stream_download table below:")
passengers = conn.sql("SELECT * FROM stream_download").df()
display(passengers)
You can see here that the input to DuckDB is a SQL query, and that the output (I won’t show that here) is the same in both cases.
If you want to run these commands yourself, either in a Jupyter notebook or in Google Colab, you can get the file from HERE. You can get an overview of the workshop HERE. When I ran in a Jupyter notebook, I had to delete the first line (%%capture) and put quotes around dlt[duckdb] in the second line.
# %%capture
!pip install 'dlt[duckdb]'
[ad_2]
Source link