--- name: azure-eventhub-ts description: Build event streaming applications using Azure Event Hubs SDK for JavaScript (@azure/event-hubs). Use when implementing high-throughput event ingestion, real-time analytics, IoT telemetry, or event-driven architectures with partitioned consumers. package: "@azure/event-hubs" --- # Azure Event Hubs SDK for TypeScript High-throughput event streaming and real-time data ingestion. ## Installation ```bash npm install @azure/event-hubs @azure/identity ``` For checkpointing with consumer groups: ```bash npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob ``` ## Environment Variables ```bash EVENTHUB_NAMESPACE=.servicebus.windows.net EVENTHUB_NAME=my-eventhub STORAGE_ACCOUNT_NAME= STORAGE_CONTAINER_NAME=checkpoints ``` ## Authentication ```typescript import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs"; import { DefaultAzureCredential } from "@azure/identity"; const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!; const eventHubName = process.env.EVENTHUB_NAME!; const credential = new DefaultAzureCredential(); // Producer const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential); // Consumer const consumer = new EventHubConsumerClient( "$Default", // Consumer group fullyQualifiedNamespace, eventHubName, credential ); ``` ## Core Workflow ### Send Events ```typescript const producer = new EventHubProducerClient(namespace, eventHubName, credential); // Create batch and add events const batch = await producer.createBatch(); batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } }); batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } }); await producer.sendBatch(batch); await producer.close(); ``` ### Send to Specific Partition ```typescript // By partition ID const batch = await producer.createBatch({ partitionId: "0" }); // By partition key (consistent hashing) const batch = await producer.createBatch({ partitionKey: "device-123" }); ``` ### Receive Events (Simple) ```typescript const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential); const subscription = consumer.subscribe({ processEvents: async (events, context) => { for (const event of events) { console.log(`Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}`); } }, processError: async (err, context) => { console.error(`Error on partition ${context.partitionId}: ${err.message}`); }, }); // Stop after some time setTimeout(async () => { await subscription.close(); await consumer.close(); }, 60000); ``` ### Receive with Checkpointing (Production) ```typescript import { EventHubConsumerClient } from "@azure/event-hubs"; import { ContainerClient } from "@azure/storage-blob"; import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"; const containerClient = new ContainerClient( `https://${storageAccount}.blob.core.windows.net/${containerName}`, credential ); const checkpointStore = new BlobCheckpointStore(containerClient); const consumer = new EventHubConsumerClient( "$Default", namespace, eventHubName, credential, checkpointStore ); const subscription = consumer.subscribe({ processEvents: async (events, context) => { for (const event of events) { console.log(`Processing: ${JSON.stringify(event.body)}`); } // Checkpoint after processing batch if (events.length > 0) { await context.updateCheckpoint(events[events.length - 1]); } }, processError: async (err, context) => { console.error(`Error: ${err.message}`); }, }); ``` ### Receive from Specific Position ```typescript const subscription = consumer.subscribe({ processEvents: async (events, context) => { /* ... */ }, processError: async (err, context) => { /* ... */ }, }, { startPosition: { // Start from beginning "0": { offset: "@earliest" }, // Start from end (new events only) "1": { offset: "@latest" }, // Start from specific offset "2": { offset: "12345" }, // Start from specific time "3": { enqueuedOn: new Date("2024-01-01") }, }, }); ``` ## Event Hub Properties ```typescript // Get hub info const hubProperties = await producer.getEventHubProperties(); console.log(`Partitions: ${hubProperties.partitionIds}`); // Get partition info const partitionProperties = await producer.getPartitionProperties("0"); console.log(`Last sequence: ${partitionProperties.lastEnqueuedSequenceNumber}`); ``` ## Batch Processing Options ```typescript const subscription = consumer.subscribe( { processEvents: async (events, context) => { /* ... */ }, processError: async (err, context) => { /* ... */ }, }, { maxBatchSize: 100, // Max events per batch maxWaitTimeInSeconds: 30, // Max wait for batch } ); ``` ## Key Types ```typescript import { EventHubProducerClient, EventHubConsumerClient, EventData, ReceivedEventData, PartitionContext, Subscription, SubscriptionEventHandlers, CreateBatchOptions, EventPosition, } from "@azure/event-hubs"; import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"; ``` ## Event Properties ```typescript // Send with properties const batch = await producer.createBatch(); batch.tryAdd({ body: { data: "payload" }, properties: { eventType: "telemetry", deviceId: "sensor-1", }, contentType: "application/json", correlationId: "request-123", }); // Access in receiver consumer.subscribe({ processEvents: async (events, context) => { for (const event of events) { console.log(`Type: ${event.properties?.eventType}`); console.log(`Sequence: ${event.sequenceNumber}`); console.log(`Enqueued: ${event.enqueuedTimeUtc}`); console.log(`Offset: ${event.offset}`); } }, }); ``` ## Error Handling ```typescript consumer.subscribe({ processEvents: async (events, context) => { try { for (const event of events) { await processEvent(event); } await context.updateCheckpoint(events[events.length - 1]); } catch (error) { // Don't checkpoint on error - events will be reprocessed console.error("Processing failed:", error); } }, processError: async (err, context) => { if (err.name === "MessagingError") { // Transient error - SDK will retry console.warn("Transient error:", err.message); } else { // Fatal error console.error("Fatal error:", err); } }, }); ``` ## Best Practices 1. **Use checkpointing** - Always checkpoint in production for exactly-once processing 2. **Batch sends** - Use `createBatch()` for efficient sending 3. **Partition keys** - Use partition keys to ensure ordering for related events 4. **Consumer groups** - Use separate consumer groups for different processing pipelines 5. **Handle errors gracefully** - Don't checkpoint on processing failures 6. **Close clients** - Always close producer/consumer when done 7. **Monitor lag** - Track `lastEnqueuedSequenceNumber` vs processed sequence