Connecter Google Bigtable à FastAPI
Découvrez l’interface async Bigtable pour des performances optimales
Google Bigtable est une base de données NoSQL puissante, conçue pour gérer de très grands volumes de données à faible latence. Dans le contexte d’une API moderne, la combinaison de Bigtable avec FastAPI permet de bénéficier d’une architecture à la fois performante et scalable.
Dans cet article, nous allons détailler une intégration propre et asynchrone de Bigtable avec FastAPI, en nous appuyant sur deux composants : BigtableFactory
pour la connexion, et un calculateur de KPI utilisant read_rows
pour des lectures efficaces. Chaque partie est accompagnée d’exemples de code concrets, pour illustrer les concepts et faciliter la mise en œuvre.
Google Bigtable en bref
Bigtable est une base orientée colonnes, conçue pour le traitement de grandes volumiétrie de données avec des accès en lecture/écriture ultra-rapides. Elle repose sur un modèle clé-valeur avec des familles de colonnes, chaque cellule étant identifiée par :
- une
row key
, - un nom de
family
, - un
qualifier
de colonne, - une
valeur
, - un
horodatage
.
Ce modèle est particulièrement adapté aux données de type time-series ou structurées de façon régulière (KPI, mesures capteurs, logs, etc).
Pourquoi FastAPI et l’async ?
FastAPI est un framework web Python moderne, très rapide à exécuter et à développer, basé sur les coroutines async
. Il permet de créer des APIs hautement performantes avec de la documentation OpenAPI intégrée.
Le SDK officiel de Bigtable est historiquement synchrone, mais de nombreuses opérations comme read_rows
peuvent être utilisées de manière performante sans thread, à condition de bien structurer l’accès aux données.
Créer une fabrique de connexion Bigtable
Une bonne pratique consiste à centraliser la création des objets Bigtable (client, instance, table) dans une fabrique. Cela permet de contrôler la réutilisation des connexions et d’éviter la création redondante d’objets à chaque requête.
from google.cloud import bigtable
from google.cloud.bigtable import Client
class BigtableFactory:
def __init__(self, project_id: str, instance_id: str):
self.project_id = project_id
self.instance_id = instance_id
self._client = None
self._instance = None
def get_client(self) -> Client:
if not self._client:
self._client = bigtable.Client(project=self.project_id, admin=True)
return self._client
def get_instance(self):
if not self._instance:
self._instance = self.get_client().instance(self.instance_id)
return self._instance
def get_table(self, table_id: str):
return self.get_instance().table(table_id)
Lire des données Bigtable avec read_rows
La méthode read_rows
permet de lire plusieurs lignes d’un seul coup, de manière efficace, en spécifiant des plages de clés (RowRange
) et des filtres (RowFilterChain
). Cela permet d’éviter des lectures ligne par ligne, coûteuses en temps.
Exemple : lecture des événements « cpu_usage » pour les ressources entre res-001
et res-100
:
from google.cloud.bigtable.row_set import RowRange
from google.cloud.bigtable.row_filters import RowFilterChain, FamilyNameRegexFilter, ColumnQualifierRegexFilter
from google.cloud.bigtable.read_rows_query import ReadRowsQuery
start_key = "kpi#res-001"
end_key = "kpi#res-100"
query = ReadRowsQuery(
row_ranges=[
RowRange(start_key=start_key.encode(), end_key=end_key.encode())
],
row_filter=RowFilterChain(
filters=[
FamilyNameRegexFilter(b"metrics"),
ColumnQualifierRegexFilter(b"cpu_usage")
]
),
)
rows = table.read_rows(query=query)
for row in rows:
print(row.row_key, row.cells)
Ce type de requête est optimal pour des recherches par lots ou par préfixes communs.
Endpoint complet : POST /api/data/compute
Avant de définir l’endpoint, voici l’initialisation de l’application FastAPI et de la table Bigtable utilisée dans les requêtes :
from fastapi import FastAPI
from bigtable_factory import BigtableFactory
app = FastAPI()
# Initialisation de la fabrique Bigtable
factory = BigtableFactory(project_id="my-project-id", instance_id="my-instance-id")
# Récupération de la table cible
table = factory.get_table("kpi-data")
Cet endpoint expose une API REST permettant de demander des KPI sur une ressource, pour une liste de métriques et une plage temporelle. Il applique les filtres de façon structurée dans une requête Bigtable.
from pydantic import BaseModel
from datetime import datetime
from fastapi import HTTPException
from google.cloud.bigtable.row_filters import RowFilterChain, FamilyNameRegexFilter, ColumnQualifierRegexFilter
from google.cloud.bigtable.row_set import RowSet, RowRange
from google.cloud.bigtable.read_rows_query import ReadRowsQuery
class KPIRequest(BaseModel):
application: str
format: str
start_datetime: datetime
end_datetime: datetime
resource: str
metrics: list[str]
@app.post("/api/data/compute")
def compute_kpi(request: KPIRequest):
# On définit la plage de clés avec start et end basés sur le même préfixe
start_key = f"kpi#{request.resource}"
end_key = f"kpi#{request.resource}0"
# Construction de la chaîne de filtres
metric_filters = [ColumnQualifierRegexFilter(m.encode()) for m in request.metrics]
filter_chain = RowFilterChain(
filters=[FamilyNameRegexFilter(b"metrics")] + metric_filters
)
# Construction de la requête Bigtable
query = ReadRowsQuery(
row_ranges=[RowRange(start_key=start_key.encode(), end_key=end_key.encode())],
row_filter=filter_chain
)
# Lecture des résultats
rows = table.read_rows(query=query)
result = {}
for row in rows:
for metric in request.metrics:
cells = row.cells.get("metrics", {}).get(metric.encode(), [])
filtered = [cell for cell in cells if request.start_datetime <= cell.timestamp <= request.end_datetime]
result[metric] = [float(cell.value.decode()) for cell in filtered]
if not result:
raise HTTPException(status_code=404, detail="No KPI data found")
return {"results": result}
Cette approche permet de mutualiser les lectures, de filtrer les colonnes pertinentes et de limiter le volume de données ramenées. Elle est idéale pour des dashboards dynamiques.
Bonnes pratiques
- ✅ Centraliser les connexions : éviter de recréer les objets
Client
,Instance
,Table
dans chaque requête - ✅ Utiliser des filtres efficaces : pour limiter les transferts de données
- ✅ Analyser les performances : surveiller la taille des plages interrogées
- ⚠️ Gérer les erreurs : Bigtable ne renvoie pas toujours d’erreur explicite en cas de clé inexistante
Conclusion
L’intégration de Google Bigtable à FastAPI ouvre la voie à une API moderne, scalable et performante. En exploitant pleinement l’interface read_rows
dans une logique asynchrone, vous optimisez l’accès aux données de type KPI et garantissez une excellente réactivité de votre application. Cette approche convient aussi bien aux dashboards temps réel qu’aux traitements analytiques distribués.
Vous souhaitez transformer vos données en un avantage compétitif avec des applications avancées ? Contactez-nous pour discuter de la manière dont nous pouvons intégrer ces technologies dans vos projets et vous aider à optimiser vos processus métier, tout en améliorant l’expérience utilisateur et la qualité de vos services.