JupyterLab¶
O JupyterLab serve como ambiente de desenvolvimento interativo para o FlumenData, fornecendo notebooks, exploração de dados e integração direta com o cluster Spark.
Visão Geral¶
Imagem: flumendata/jupyterlab:spark-4.0.1 (construída customizada)
Base: jupyter/scipy-notebook:python-3.10
Porta: 8888 (UI Web)
Python: 3.10
PySpark: 4.0.1 (corresponde ao cluster)
Delta Lake: 4.0.0 (corresponde ao cluster)
Arquitetura¶
O JupyterLab fornece: - Notebooks Interativos: Notebooks Jupyter com kernels Python, PySpark e SQL - Integração Spark: Conexão direta ao cluster Spark FlumenData (modo cliente) - Suporte Delta Lake: Leitura e escrita de tabelas Delta com garantias ACID - Acesso Hive Metastore: Consulta e gerenciamento de bancos e tabelas - Integração S3/MinIO: Acesso direto ao armazenamento data lake - Stack Data Science: pandas, matplotlib, seaborn, plotly para análise e visualização
Comandos Python CLI¶
# Construir e iniciar
python3 flumen rebuild --service jupyterlab # Construir imagem customizada JupyterLab
python3 flumen up --tier 2 # Iniciar JupyterLab e outros serviços Tier 2
# Acesso
python3 flumen token-jupyterlab # Obter token de acesso JupyterLab
python3 flumen logs --service jupyterlab # Ver logs do JupyterLab
# Desenvolvimento
python3 flumen shell-jupyterlab # Abrir shell bash
# Testes
python3 flumen test --service jupyterlab # Testar integração Spark
python3 flumen health --service jupyterlab # Verificar saúde do serviço
# Gerenciamento
python3 flumen restart # Reiniciar serviço
python3 flumen down --tier 2 # Parar serviços Tier 2
Primeiros Passos¶
1. Iniciar JupyterLab¶
2. Obter Token de Acesso¶
3. Acessar UI Web¶
Abra http://localhost:8888 e digite o token.
Usando PySpark em Notebooks¶
Criar Sessão Spark¶
from pyspark.sql import SparkSession
# Criar sessão Spark conectada ao cluster FlumenData
spark = SparkSession.builder \
.appName("JupyterLab-Notebook") \
.master("spark://spark-master:7077") \
.config("spark.submit.deployMode", "client") \
.config("spark.driver.memory", "2g") \
.getOrCreate()
# Verificar conexão
print(f"Versão Spark: {spark.version}")
print(f"Implementação de catálogo: {spark.conf.get('spark.sql.catalogImplementation')}")
Trabalhando com Bancos de Dados e Tabelas¶
# Mostrar bancos de dados disponíveis
spark.sql("SHOW DATABASES").show()
# Criar um novo banco de dados
spark.sql("CREATE DATABASE IF NOT EXISTS analytics")
# Usar o banco de dados
spark.sql("USE analytics")
# Criar uma tabela Delta
spark.sql("""
CREATE TABLE IF NOT EXISTS customers (
customer_id INT,
name STRING,
email STRING,
country STRING,
signup_date DATE,
lifetime_value DECIMAL(10,2)
) USING DELTA
LOCATION 's3a://lakehouse/warehouse/analytics.db/customers'
""")
# Inserir dados de exemplo
spark.sql("""
INSERT INTO customers VALUES
(1, 'Alice Smith', 'alice@example.com', 'USA', '2024-01-15', 1250.50),
(2, 'Bob Johnson', 'bob@example.com', 'Canada', '2024-02-20', 890.25),
(3, 'Carol Davis', 'carol@example.com', 'UK', '2024-03-10', 2100.00)
""")
# Consultar dados
df = spark.sql("SELECT * FROM customers WHERE lifetime_value > 1000")
df.show()
API DataFrame¶
from pyspark.sql.functions import col, avg, count, sum as spark_sum
# Ler tabela Delta como DataFrame
customers_df = spark.table("analytics.customers")
# Exploração de dados
customers_df.printSchema()
customers_df.describe().show()
# Agregações
country_stats = customers_df.groupBy("country") \
.agg(
count("*").alias("customer_count"),
avg("lifetime_value").alias("avg_ltv"),
spark_sum("lifetime_value").alias("total_ltv")
) \
.orderBy(col("total_ltv").desc())
country_stats.show()
Viagem no Tempo com Delta Lake¶
# Ver histórico da tabela
spark.sql("DESCRIBE HISTORY analytics.customers").show(truncate=False)
# Consultar em timestamp específico
df_yesterday = spark.read \
.format("delta") \
.option("timestampAsOf", "2024-11-09 10:00:00") \
.table("analytics.customers")
# Consultar versão específica
df_v0 = spark.read \
.format("delta") \
.option("versionAsOf", 0) \
.table("analytics.customers")
# Restaurar tabela para versão anterior
spark.sql("RESTORE TABLE analytics.customers TO VERSION AS OF 2")
Análise de Dados e Visualização¶
Usando pandas¶
# Converter DataFrame Spark para pandas
pandas_df = spark.table("analytics.customers").toPandas()
# Análise pandas
print(pandas_df.describe())
print(pandas_df.groupby('country')['lifetime_value'].mean())
Visualização¶
import matplotlib.pyplot as plt
import seaborn as sns
# Definir estilo
sns.set_theme(style="whitegrid")
# Consultar dados
df = spark.sql("""
SELECT country, COUNT(*) as customer_count, AVG(lifetime_value) as avg_ltv
FROM analytics.customers
GROUP BY country
""").toPandas()
# Criar gráficos
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
# Gráfico de barras
sns.barplot(data=df, x='country', y='customer_count', ax=ax1)
ax1.set_title('Clientes por País')
ax1.set_ylabel('Contagem de Clientes')
# LTV Médio
sns.barplot(data=df, x='country', y='avg_ltv', ax=ax2)
ax2.set_title('Valor Médio de Lifetime por País')
ax2.set_ylabel('LTV Médio ($)')
plt.tight_layout()
plt.show()
Pacotes Python Instalados¶
O JupyterLab vem com uma stack completa de ciência de dados:
Core: - pyspark 4.0.1 - delta-spark 4.0.0
Processamento de Dados: - pandas 2.2.2 - pyarrow 16.1.0
Visualização: - matplotlib 3.9.0 - seaborn 0.13.2 - plotly 5.22.0
Armazenamento: - boto3 1.34.144 - s3fs 2024.6.1
Banco de Dados: - psycopg2-binary 2.9.9 - sqlalchemy 2.0.31
Jupyter: - jupyterlab-git 0.50.1 - ipywidgets 8.1.3
Armazenamento Persistente¶
Notebooks e arquivos são armazenados no volume nomeado flumen_jupyter_notebooks e persistem através de reinicializações do contêiner.
Acessar notebooks em: /home/jovyan/work
Diretório de dados compartilhados: /home/jovyan/shared
Solução de Problemas¶
Não consegue conectar ao cluster Spark¶
Verificar se o Spark master está rodando:
Verificar conectividade de rede:
Token de acesso perdido¶
Recuperar token dos logs:
Ou da lista de servidores:
Próximos Passos¶
- Criar dashboards no Superset
- Consultar tabelas refinadas via Trino
- Aprender sobre otimização Delta Lake