Connecter Google Bigtable à FastAPI

Découvrez l’interface async Bigtable pour des performances optimales

Google Bigtable est une base de données NoSQL puissante, conçue pour gérer de très grands volumes de données à faible latence. Dans le contexte d’une API moderne, la combinaison de Bigtable avec FastAPI permet de bénéficier d’une architecture à la fois performante et scalable.

Dans cet article, nous allons détailler une intégration propre et asynchrone de Bigtable avec FastAPI, en nous appuyant sur deux composants : BigtableFactory pour la connexion, et un calculateur de KPI utilisant read_rows pour des lectures efficaces. Chaque partie est accompagnée d’exemples de code concrets, pour illustrer les concepts et faciliter la mise en œuvre.

Google Bigtable en bref

Bigtable est une base orientée colonnes, conçue pour le traitement de grandes volumiétrie de données avec des accès en lecture/écriture ultra-rapides. Elle repose sur un modèle clé-valeur avec des familles de colonnes, chaque cellule étant identifiée par :

  • une row key,
  • un nom de family,
  • un qualifier de colonne,
  • une valeur,
  • un horodatage.

Ce modèle est particulièrement adapté aux données de type time-series ou structurées de façon régulière (KPI, mesures capteurs, logs, etc).

Pourquoi FastAPI et l’async ?

FastAPI est un framework web Python moderne, très rapide à exécuter et à développer, basé sur les coroutines async. Il permet de créer des APIs hautement performantes avec de la documentation OpenAPI intégrée.

Le SDK officiel de Bigtable est historiquement synchrone, mais de nombreuses opérations comme read_rows peuvent être utilisées de manière performante sans thread, à condition de bien structurer l’accès aux données.

Créer une fabrique de connexion Bigtable

Une bonne pratique consiste à centraliser la création des objets Bigtable (client, instance, table) dans une fabrique. Cela permet de contrôler la réutilisation des connexions et d’éviter la création redondante d’objets à chaque requête.

from google.cloud import bigtable
from google.cloud.bigtable import Client

class BigtableFactory:
    def __init__(self, project_id: str, instance_id: str):
        self.project_id = project_id
        self.instance_id = instance_id
        self._client = None
        self._instance = None

    def get_client(self) -> Client:
        if not self._client:
            self._client = bigtable.Client(project=self.project_id, admin=True)
        return self._client

    def get_instance(self):
        if not self._instance:
            self._instance = self.get_client().instance(self.instance_id)
        return self._instance

    def get_table(self, table_id: str):
        return self.get_instance().table(table_id)

Lire des données Bigtable avec read_rows

La méthode read_rows permet de lire plusieurs lignes d’un seul coup, de manière efficace, en spécifiant des plages de clés (RowRange) et des filtres (RowFilterChain). Cela permet d’éviter des lectures ligne par ligne, coûteuses en temps.

Exemple : lecture des événements « cpu_usage » pour les ressources entre res-001 et res-100 :

from google.cloud.bigtable.row_set import RowRange
from google.cloud.bigtable.row_filters import RowFilterChain, FamilyNameRegexFilter, ColumnQualifierRegexFilter
from google.cloud.bigtable.read_rows_query import ReadRowsQuery

start_key = "kpi#res-001"
end_key = "kpi#res-100"

query = ReadRowsQuery(
    row_ranges=[
        RowRange(start_key=start_key.encode(), end_key=end_key.encode())
    ],
    row_filter=RowFilterChain(
        filters=[
            FamilyNameRegexFilter(b"metrics"),
            ColumnQualifierRegexFilter(b"cpu_usage")
        ]
    ),
)

rows = table.read_rows(query=query)
for row in rows:
    print(row.row_key, row.cells)

Ce type de requête est optimal pour des recherches par lots ou par préfixes communs.

Endpoint complet : POST /api/data/compute

Avant de définir l’endpoint, voici l’initialisation de l’application FastAPI et de la table Bigtable utilisée dans les requêtes :

from fastapi import FastAPI
from bigtable_factory import BigtableFactory

app = FastAPI()

# Initialisation de la fabrique Bigtable
factory = BigtableFactory(project_id="my-project-id", instance_id="my-instance-id")

# Récupération de la table cible
table = factory.get_table("kpi-data")

Cet endpoint expose une API REST permettant de demander des KPI sur une ressource, pour une liste de métriques et une plage temporelle. Il applique les filtres de façon structurée dans une requête Bigtable.

from pydantic import BaseModel
from datetime import datetime
from fastapi import HTTPException
from google.cloud.bigtable.row_filters import RowFilterChain, FamilyNameRegexFilter, ColumnQualifierRegexFilter
from google.cloud.bigtable.row_set import RowSet, RowRange
from google.cloud.bigtable.read_rows_query import ReadRowsQuery

class KPIRequest(BaseModel):
    application: str
    format: str
    start_datetime: datetime
    end_datetime: datetime
    resource: str
    metrics: list[str]

@app.post("/api/data/compute")
def compute_kpi(request: KPIRequest):
    # On définit la plage de clés avec start et end basés sur le même préfixe
    start_key = f"kpi#{request.resource}"
    end_key = f"kpi#{request.resource}0"

    # Construction de la chaîne de filtres
    metric_filters = [ColumnQualifierRegexFilter(m.encode()) for m in request.metrics]
    filter_chain = RowFilterChain(
        filters=[FamilyNameRegexFilter(b"metrics")] + metric_filters
    )

    # Construction de la requête Bigtable
    query = ReadRowsQuery(
        row_ranges=[RowRange(start_key=start_key.encode(), end_key=end_key.encode())],
        row_filter=filter_chain
    )

    # Lecture des résultats
    rows = table.read_rows(query=query)

    result = {}
    for row in rows:
        for metric in request.metrics:
            cells = row.cells.get("metrics", {}).get(metric.encode(), [])
            filtered = [cell for cell in cells if request.start_datetime <= cell.timestamp <= request.end_datetime]
            result[metric] = [float(cell.value.decode()) for cell in filtered]

    if not result:
        raise HTTPException(status_code=404, detail="No KPI data found")

    return {"results": result}

Cette approche permet de mutualiser les lectures, de filtrer les colonnes pertinentes et de limiter le volume de données ramenées. Elle est idéale pour des dashboards dynamiques.

Bonnes pratiques

  • ✅ Centraliser les connexions : éviter de recréer les objets ClientInstanceTable dans chaque requête
  • ✅ Utiliser des filtres efficaces : pour limiter les transferts de données
  • ✅ Analyser les performances : surveiller la taille des plages interrogées
  • ⚠️ Gérer les erreurs : Bigtable ne renvoie pas toujours d’erreur explicite en cas de clé inexistante

Conclusion

L’intégration de Google Bigtable à FastAPI ouvre la voie à une API moderne, scalable et performante. En exploitant pleinement l’interface read_rows dans une logique asynchrone, vous optimisez l’accès aux données de type KPI et garantissez une excellente réactivité de votre application. Cette approche convient aussi bien aux dashboards temps réel qu’aux traitements analytiques distribués.


Vous souhaitez transformer vos données en un avantage compétitif avec des applications avancées ? Contactez-nous pour discuter de la manière dont nous pouvons intégrer ces technologies dans vos projets et vous aider à optimiser vos processus métier, tout en améliorant l’expérience utilisateur et la qualité de vos services.

Retour en haut
Consentement à l'utilisation de Cookies avec Real Cookie Banner