---
name: ably-realtime
description: Ably real-time messaging patterns, WebSocket channel management, message validation and processing, staleness filtering, error recovery strategies, collaborative editing with drag-and-drop, optimistic updates for voting, real-time board collaboration, and Ably integration best practices for ree-board project
model: sonnet
---
# Ably Real-time Collaboration
## When to Use This Skill
Activate this skill when working on:
- Implementing real-time features
- Setting up Ably channels
- Processing real-time messages
- Building collaborative editing features
- Implementing drag-and-drop with real-time sync
- Handling WebSocket connections
- Managing real-time state updates
- Optimizing real-time performance
## Core Patterns
### Channel Management
**Channel Naming Convention:**
```typescript
// Pattern: `board:{boardId}`
const channelName = `board:${boardId}`;
```
**Setting Up Channels:**
```typescript
"use client";
import { useChannel } from "ably/react";
import { useEffect } from "react";
export function PostChannelComponent({ boardId }: { boardId: string }) {
const { channel } = useChannel(`board:${boardId}`, (message) => {
processMessage(message);
});
return ;
}
```
### Message Processors with Validation
**Critical Pattern:** Always validate messages before processing
```typescript
// lib/realtime/messageProcessors.ts
import { z } from "zod";
// Define message schema
const PostUpdateMessageSchema = z.object({
type: z.literal("post:update"),
postId: z.string(),
content: z.string().min(1).max(1000),
userId: z.string(),
timestamp: z.number(),
});
// Message processor
export const processPostUpdate = (rawData: unknown) => {
try {
// ✅ Validate message structure
const data = PostUpdateMessageSchema.parse(rawData);
// ✅ Check staleness (30s threshold)
const now = Date.now();
const age = now - data.timestamp;
if (age > 30000) {
console.warn("Stale message discarded", {
type: data.type,
age,
postId: data.postId,
});
return;
}
// ✅ Process validated, fresh message
updatePostContent(data.postId, data.content);
} catch (error) {
if (error instanceof z.ZodError) {
console.error("Invalid message structure", {
details: error.errors,
rawData,
});
} else {
console.error("Message processing error", error);
}
}
};
```
### Staleness Filtering
**30-Second Threshold:** Prevents processing old messages after reconnection
```typescript
const STALENESS_THRESHOLD_MS = 30000;
export function isMessageStale(timestamp: number): boolean {
const age = Date.now() - timestamp;
return age > STALENESS_THRESHOLD_MS;
}
// Usage in message handler
useChannel(`board:${boardId}`, (message) => {
const { timestamp } = message.data;
if (isMessageStale(timestamp)) {
console.warn("Dropping stale message", { age: Date.now() - timestamp });
return;
}
processMessage(message.data);
});
```
### Error Recovery Strategies
**Connection Error Handling:**
```typescript
import { useConnectionStateListener } from "ably/react";
export function RealtimeProvider({ children }: { children: React.ReactNode }) {
const [connectionState, setConnectionState] = useState("initialized");
useConnectionStateListener((stateChange) => {
setConnectionState(stateChange.current);
switch (stateChange.current) {
case "connected":
console.log("✅ Connected to Ably");
break;
case "disconnected":
console.warn("⚠️ Disconnected from Ably");
break;
case "suspended":
console.error("❌ Connection suspended");
// Optionally show user notification
break;
case "failed":
console.error("❌ Connection failed");
// Show error message to user
break;
}
});
return (
<>
{connectionState !== "connected" && (
)}
{children}
>
);
}
```
### Publishing Messages
**Always Include Timestamp:**
```typescript
import { useChannel } from "ably/react";
export function usePublishPostUpdate() {
const { channel } = useChannel(`board:${boardId}`);
const publishUpdate = async (postId: string, content: string) => {
await channel.publish("post:update", {
type: "post:update",
postId,
content,
userId: currentUserId,
timestamp: Date.now(), // ✅ Always include timestamp
});
};
return publishUpdate;
}
```
### Optimistic Updates for Voting
**Pattern:** Update UI immediately, sync in background
```typescript
"use client";
import { voteSignal } from "@/lib/signal/postSignals";
import { useChannel } from "ably/react";
export function VoteButton({
postId,
boardId,
}: {
postId: string;
boardId: string;
}) {
const { channel } = useChannel(`board:${boardId}`);
const handleVote = async () => {
// ✅ Optimistic update (immediate UI feedback)
voteSignal.value = {
...voteSignal.value,
[postId]: (voteSignal.value[postId] || 0) + 1,
};
try {
// Persist to database
await submitVote(postId);
// Broadcast to other users
await channel.publish("post:vote", {
type: "post:vote",
postId,
increment: 1,
timestamp: Date.now(),
});
} catch (error) {
// ❌ Rollback on error
voteSignal.value = {
...voteSignal.value,
[postId]: voteSignal.value[postId] - 1,
};
console.error("Vote failed", error);
}
};
return ;
}
```
### Drag-and-Drop Integration
**Lazy-Loaded with Real-Time Sync:**
```typescript
// components/board/PostProvider.tsx
"use client";
import { useChannel } from "ably/react";
import dynamic from "next/dynamic";
// ✅ Lazy load drag-and-drop (reduces initial bundle)
const DragDropArea = dynamic(() => import("./DragDropArea"), { ssr: false });
export function PostProvider({ boardId }: { boardId: string }) {
useChannel(`board:${boardId}`, (message) => {
if (message.name === "post:move") {
handlePostMove(message.data);
}
});
const handleDrop = async (postId: string, newType: PostType) => {
// Update locally
movePostSignal(postId, newType);
// Persist to database
await updatePostType(postId, newType);
// Broadcast to other users
channel.publish("post:move", {
type: "post:move",
postId,
newType,
timestamp: Date.now(),
});
};
return ;
}
```
## Anti-Patterns
### ❌ Not Validating Messages
**Bad:**
```typescript
useChannel(`board:${boardId}`, (message) => {
// ❌ Trusts message data completely
updatePost(message.data.postId, message.data.content);
});
```
**Good:**
```typescript
useChannel(`board:${boardId}`, (message) => {
// ✅ Validates before processing
const validated = PostUpdateSchema.safeParse(message.data);
if (!validated.success) return;
updatePost(validated.data.postId, validated.data.content);
});
```
### ❌ Not Checking Message Staleness
**Bad:**
```typescript
useChannel(channelName, (message) => {
// ❌ Processes all messages, even old ones after reconnect
processMessage(message.data);
});
```
**Good:**
```typescript
useChannel(channelName, (message) => {
// ✅ Filters stale messages
if (isMessageStale(message.data.timestamp)) return;
processMessage(message.data);
});
```
### ❌ Not Handling Connection Errors
**Bad:**
```typescript
// ❌ No error handling
const { channel } = useChannel(channelName);
```
**Good:**
```typescript
// ✅ Monitor connection state
useConnectionStateListener((stateChange) => {
if (stateChange.current === "failed") {
showErrorNotification("Real-time connection lost");
}
});
```
### ❌ Publishing Without Timestamp
**Bad:**
```typescript
channel.publish("update", {
postId,
content,
// ❌ No timestamp for staleness check
});
```
**Good:**
```typescript
channel.publish("update", {
postId,
content,
timestamp: Date.now(), // ✅ Include timestamp
});
```
### ❌ Not Handling Race Conditions
**Bad:**
```typescript
// ❌ Multiple updates could conflict
const handleVote = async () => {
const newCount = currentCount + 1;
await updateVoteCount(postId, newCount);
};
```
**Good:**
```typescript
// ✅ Use atomic increment
const handleVote = async () => {
await db
.update(postTable)
.set({ voteCount: sql`vote_count + 1` })
.where(eq(postTable.id, postId));
};
```
## Integration with Other Skills
- **[rbac-security](../rbac-security/SKILL.md):** Validate messages for security
- **[signal-state-management](../signal-state-management/SKILL.md):** Real-time updates to signals
- **[nextjs-app-router](../nextjs-app-router/SKILL.md):** Client components for real-time features
- **[testing-patterns](../testing-patterns/SKILL.md):** Test message processors with fake timers
## Project-Specific Context
### Key Files
- `lib/realtime/messageProcessors.ts` - Message validation and processing
- `components/board/PostProvider.tsx` - Real-time channel setup
- `components/board/PostChannelComponent.tsx` - Channel subscription
- `lib/realtime/__tests__/` - Message processor tests
### Message Types
**Current Message Types:**
```typescript
type MessageType =
| "post:create"
| "post:update"
| "post:delete"
| "post:move"
| "post:vote"
| "member:join"
| "member:leave";
```
### Channel Structure
**One Channel Per Board:**
- Channel name: `board:{boardId}`
- All board events published to this channel
- Subscribers filter by message type
### Performance Optimizations
1. **Lazy Loading:** Drag-and-drop loaded on demand
2. **Staleness Filter:** Discards messages >30s old
3. **Optimistic Updates:** Immediate UI feedback
4. **Batching:** Vote counts updated atomically
5. **Connection Pooling:** Reuse Ably client instance
### Error Recovery
**Automatic Reconnection:**
- Ably SDK handles reconnection automatically
- Messages buffered during disconnection
- Staleness filter prevents processing old messages after reconnect
**Manual Recovery:**
```typescript
// Refresh data after long disconnection
if (
stateChange.previous === "suspended" &&
stateChange.current === "connected"
) {
await refreshBoardData();
}
```
### Testing
**Mock Ably in Tests:**
```typescript
jest.mock("ably/react", () => ({
useChannel: jest.fn(() => ({
channel: {
publish: jest.fn(),
subscribe: jest.fn(),
},
})),
}));
```
---
**Last Updated:** 2026-01-10