databricks tutorial with code examples

databricks tutorial with code examples
example 1: creating an etl pipeline in databricks
from pyspark.sql import sparkSession
spark = sparkSession.builder.appName("etl_pipeline").getOrCreate()
df = spark.read.csv("s3://data-source/customers.csv", header=True, inferSchema=True)
df.write.mode("overwrite").parquet("s3://data-warehouse/customers.parquet")
explanation
initialize spark session –
spark = sparkSession.builder.appName("etl_pipeline").getOrCreate()
creates a spark session in databricks, enabling distributed data processing for large-scale etl workloads.read data from s3 –
df =
spark.read
.csv("s3://data-source/customers.csv", header=True, inferSchema=True)
loads a csv dataset from amazon s3, automatically inferring schema and setting column headers.write data to parquet –
df.write.mode("overwrite").parquet("s3://data-warehouse/customers.parquet")
saves the transformed data as a parquet file in an s3 data warehouse, optimizing storage.ensure efficient querying – parquet files improve performance, and
mode("overwrite")
ensures new runs replace old data, preventing duplication issues in analytics queries.
example 2: training a machine learning model in databricks
from pyspark.ml.classification import logisticRegression
from pyspark.ml.feature import vectorAssembler
df = spark.read.parquet("s3://data-warehouse/customers.parquet")
features = vectorAssembler(inputCols=["age", "income"], outputCol="features").transform(df)
model = logisticRegression(labelCol="churn", featuresCol="features").fit(features)
model.save("s3://models/churn_model")
explanation
load data from parquet –
df =
spark.read
.parquet("s3://data-warehouse/customers.parquet")
reads structured data from an optimized parquet storage, ensuring fast and efficient data retrieval.assemble features –
features = vectorAssembler(inputCols=["age", "income"], outputCol="features").transform(df)
converts numerical columns into a single feature vector required for machine learning models.train logistic regression –
model = logisticRegression(labelCol="churn", featuresCol="features").fit(features)
trains a classification model on customer churn data using logistic regression in databricks.save trained model –
model.save
("s3://models/churn_model")
persists the trained machine learning model, allowing it to be reused for predictions without retraining.
example 3: running a real-time streaming job in databricks
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import structType, stringType, integerType
schema = structType().add("user_id", integerType()).add("action", stringType())
stream = spark.readStream.format("kafka").option("subscribe", "user_events").load()
parsed = stream.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")
parsed.writeStream.format("console").start()
explanation
define streaming schema –
schema = structType().add("user_id", integerType()).add("action", stringType())
specifies the structure of incoming kafka messages for real-time processing.connect to kafka –
stream = spark.readStream.format("kafka").option("subscribe", "user_events").load()
establishes a streaming connection to a kafka topic to ingest live data in databricks.parse streaming data –
parsed =
stream.select
(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")
extracts and structures incoming kafka messages based on the defined schema.write stream to console –
parsed.writeStream.format("console").start()
outputs processed streaming data to the console, enabling real-time monitoring and debugging of incoming user events.
example 4: optimizing queries using databricks delta lake
from delta.tables import deltaTable
delta_table = deltaTable.forPath(spark, "s3://delta-lake/sales")
delta_table.optimize().executeCompaction()
delta_table.vacuum(168)
explanation
initialize delta table –
delta_table = deltaTable.forPath(spark, "s3://delta-lake/sales")
loads a delta lake table for transactional data management within databricks.run optimization –
delta_table.optimize().executeCompaction()
compacts small files into larger ones, improving query performance by reducing i/o overhead in databricks.clean up old files –
delta_table.vacuum(168)
removes outdated files older than 168 hours (7 days), preventing unnecessary storage costs and maintaining data efficiency.ensure query speed – delta lake optimization ensures faster analytics by structuring data efficiently, enabling low-latency queries on large datasets within databricks.
these four examples demonstrate essential databricks functionalities, covering etl pipelines, machine learning, streaming, and delta lake optimization.
Subscribe to my newsletter
Read articles from user1272047 directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
