Real-time Communication
Real-time Communication
Section titled “Real-time Communication”Problem and Rationale
Section titled “Problem and Rationale”Customer support requires immediate updates to:
- Display new messages instantly without polling
- Show typing indicators for better UX
- Update conversation status changes in real-time
- Reflect assignee changes and team activity
- Synchronize presence status across agents
- Notify on new conversations
Why Action Cable:
- Native WebSocket support in Zelta Chat backend
- Bi-directional communication
- Automatic reconnection handling
- Channel-based subscription model
- Easy integration with Redux
Implementation: @kesha-antonov/react-native-action-cable
Design
Section titled “Design”WebSocket Architecture
Section titled “WebSocket Architecture”Mobile App ←→ Action Cable ←→ Zelta Chat Server ↓ ↓ ↓Redux Store WebSocket Rails Server ↓ Connection ↓UI Updates ← Events ← Server-Side EventsEvent Types
Section titled “Event Types”Conversation Events:
message.created- New message in conversationmessage.updated- Message edited or status changedconversation.created- New conversation assignedconversation.updated- Conversation metadata changedconversation.status_changed- Status change (open/resolved/pending)conversation.read- Conversation marked as readassignee.changed- Conversation reassigned
User Activity Events:
conversation.typing_on- User started typingconversation.typing_off- User stopped typingpresence.update- Agent availability changed
Contact Events:
contact.updated- Contact information changed
Notification Events:
notification.created- New notificationnotification.deleted- Notification removed
Connection Configuration
Section titled “Connection Configuration”interface ActionCableConfig { pubSubToken: string; // User's WebSocket auth token webSocketUrl: string; // WebSocket endpoint URL accountId: number; // Current account ID userId: number; // Current user ID}Lifecycle and Data Flow
Section titled “Lifecycle and Data Flow”Connection Lifecycle
Section titled “Connection Lifecycle”User Logs In ↓Fetch Profile (includes pubSubToken) ↓Initialize Action Cable Connector ↓Create WebSocket Connection ↓Subscribe to User Channel ↓├─ Connection Established│ ↓│ Receive Welcome Message│ ↓│ Ready for Events│└─ Connection Failed ↓ Automatic Retry (Exponential Backoff) ↓ Reconnect AttemptMessage Received Flow
Section titled “Message Received Flow”Server Sends Event (e.g., message.created) ↓Action Cable Receives Raw Data ↓Event Handler Processes Event Type ↓Transform Data (snake_case → camelCase) ↓Dispatch Redux Action (addOrUpdateMessage) ↓Reducer Updates State ↓Connected Components Re-render ↓UI Shows New MessageTyping Indicator Flow
Section titled “Typing Indicator Flow”User A Types in Conversation ↓Server Broadcasts: conversation.typing_on ↓User B's App Receives Event ↓Extract: { conversation, user } ↓Dispatch: setTypingUsers({ conversationId, user }) ↓Show Typing Indicator: "User A is typing..." ↓After 30s OR typing_off Event ↓Dispatch: removeTypingUser({ conversationId, user }) ↓Hide Typing IndicatorImplementation Notes
Section titled “Implementation Notes”Action Cable Connector
Section titled “Action Cable Connector”Base Implementation (src/utils/baseActionCableConnector.ts):
abstract class BaseActionCableConnector { protected cable: ActionCable.Cable | null = null; protected subscription: ActionCable.Channel | null = null; protected abstract events: { [key: string]: (data: any) => void };
constructor( protected pubSubToken: string, protected webSocketUrl: string, protected accountId: number, protected userId: number, ) { this.createConnection(); }
private createConnection() { this.cable = ActionCable.createConsumer( `${this.webSocketUrl}?pubsub_token=${this.pubSubToken}` );
this.subscription = this.cable.subscriptions.create( { channel: 'RoomChannel', pubsub_token: this.pubSubToken, account_id: this.accountId, user_id: this.userId }, { connected: this.handleConnected, disconnected: this.handleDisconnected, received: this.handleReceived, } ); }
private handleReceived = (data: any) => { const { event } = data; const eventHandler = this.events[event];
if (eventHandler) { eventHandler(data); } else { console.log('Unhandled event:', event); } };
private handleConnected = () => { console.log('Action Cable connected'); };
private handleDisconnected = () => { console.log('Action Cable disconnected'); };
public disconnect() { if (this.subscription) { this.subscription.unsubscribe(); } if (this.cable) { this.cable.disconnect(); } }}Concrete Implementation (src/utils/actionCable.ts):
class ActionCableConnector extends BaseActionCableConnector { private CancelTyping: { [key: number]: NodeJS.Timeout | null } = {}; protected events: { [key: string]: (data: any) => void };
constructor(pubSubToken: string, webSocketUrl: string, accountId: number, userId: number) { super(pubSubToken, webSocketUrl, accountId, userId);
this.events = { 'message.created': this.onMessageCreated, 'message.updated': this.onMessageUpdated, 'conversation.created': this.onConversationCreated, 'conversation.status_changed': this.onStatusChange, 'conversation.typing_on': this.onTypingOn, 'conversation.typing_off': this.onTypingOff, 'contact.updated': this.onContactUpdate, 'notification.created': this.onNotificationCreated, 'presence.update': this.onPresenceUpdate, // ... more events }; }
onMessageCreated = (data: Message) => { const message = transformMessage(data); const { conversation, conversationId } = message;
store.dispatch(updateConversationLastActivity({ lastActivityAt: conversation?.lastActivityAt, conversationId })); store.dispatch(addOrUpdateMessage(message)); };
onTypingOn = (data: TypingData) => { const typingData = transformTypingData(data); const { conversation, user } = typingData;
store.dispatch(setTypingUsers({ conversationId: conversation.id, user })); this.initTimer(typingData); };
private initTimer = (data: TypingData) => { const conversationId = data.conversation.id;
if (this.CancelTyping[conversationId]) { clearTimeout(this.CancelTyping[conversationId]!); }
this.CancelTyping[conversationId] = setTimeout(() => { this.onTypingOff(data); }, 30000); // Auto-clear after 30s };}
export default { init({ pubSubToken, webSocketUrl, accountId, userId }: ActionCableConfig) { return new ActionCableConnector(pubSubToken, webSocketUrl, accountId, userId); },};Integration with App Lifecycle
Section titled “Integration with App Lifecycle”Initialization (src/navigation/tabs/AppTabs.tsx):
const Tabs = () => { const pubSubToken = useAppSelector(selectPubSubToken); const userId = useAppSelector(selectUserId); const accountId = useAppSelector(selectCurrentUserAccountId); const webSocketUrl = useAppSelector(selectWebSocketUrl);
useEffect(() => { initActionCable(); }, []);
const initActionCable = useCallback(async () => { if (pubSubToken && webSocketUrl && accountId && userId) { actionCableConnector.init({ pubSubToken, webSocketUrl, accountId, userId }); } }, [accountId, pubSubToken, userId, webSocketUrl]);
// ... rest of component};Data Transformation
Section titled “Data Transformation”Convert server data from snake_case to camelCase:
import { transformMessage, transformConversation } from '@/utils/camelCaseKeys';
onMessageCreated = (data: Message) => { const message = transformMessage(data); // snake_case → camelCase store.dispatch(addOrUpdateMessage(message));};Transformation Utilities (src/utils/camelCaseKeys.ts):
import camelcaseKeys from 'camelcase-keys';
export const transformMessage = (message: any): Message => { return camelcaseKeys(message, { deep: true });};
export const transformConversation = (conversation: any): Conversation => { return camelcaseKeys(conversation, { deep: true });};Patterns and Anti-Patterns
Section titled “Patterns and Anti-Patterns”✅ Good Patterns:
// Clear timers on cleanupuseEffect(() => { const timer = setTimeout(() => { dispatch(removeTypingUser({ conversationId, user })); }, 30000);
return () => clearTimeout(timer); // ✅ Always cleanup}, []);
// Handle connection stateconst [isConnected, setIsConnected] = useState(false);
handleConnected = () => { setIsConnected(true); // Optionally re-fetch data to ensure sync};
// Debounce frequent eventsconst debouncedUpdate = debounce((data) => { store.dispatch(updatePresence(data));}, 1000);❌ Anti-Patterns:
// DON'T: Forget to cleanup timersthis.CancelTyping[conversationId] = setTimeout(...); // ❌ Memory leak
// DON'T: Mutate state directlyonMessageCreated = (data) => { state.messages.push(data); // ❌ Use Redux dispatch};
// DON'T: Block event handlersonMessageCreated = async (data) => { await someSlowOperation(); // ❌ Keep handlers fast store.dispatch(addMessage(data));};Performance Considerations
Section titled “Performance Considerations”Event Batching
Section titled “Event Batching”For high-frequency events, batch updates:
let pendingUpdates: Message[] = [];let batchTimer: NodeJS.Timeout | null = null;
onMessageCreated = (data: Message) => { pendingUpdates.push(transformMessage(data));
if (!batchTimer) { batchTimer = setTimeout(() => { store.dispatch(addMessages(pendingUpdates)); pendingUpdates = []; batchTimer = null; }, 100); // Batch every 100ms }};Selective Subscriptions
Section titled “Selective Subscriptions”Only subscribe to necessary channels:
// Subscribe only to active conversationif (activeConversationId) { subscription = cable.subscriptions.create({ channel: 'ConversationChannel', conversation_id: activeConversationId, });}Connection Pooling
Section titled “Connection Pooling”Reuse connection across account switches:
let connection: ActionCableConnector | null = null;
export const getConnection = (config: ActionCableConfig) => { if (!connection) { connection = actionCableConnector.init(config); } return connection;};Testing Strategy
Section titled “Testing Strategy”Mock Action Cable
Section titled “Mock Action Cable”jest.mock('@kesha-antonov/react-native-action-cable', () => ({ createConsumer: jest.fn(() => ({ subscriptions: { create: jest.fn((channel, callbacks) => { mockCallbacks = callbacks; return { unsubscribe: jest.fn() }; }), }, disconnect: jest.fn(), })),}));Test Event Handling
Section titled “Test Event Handling”describe('ActionCableConnector', () => { it('should dispatch addOrUpdateMessage on message.created', () => { const connector = new ActionCableConnector(/* config */); const mockMessage = { id: 1, content: 'Hello' };
mockCallbacks.received({ event: 'message.created', ...mockMessage });
expect(store.dispatch).toHaveBeenCalledWith( addOrUpdateMessage(transformMessage(mockMessage)) ); });});Extending This Concept
Section titled “Extending This Concept”Adding New Event Type
Section titled “Adding New Event Type”- Define event handler:
onCustomEvent = (data: CustomEventData) => { const transformed = transformCustomEvent(data); store.dispatch(handleCustomEvent(transformed));};- Register in events map:
this.events = { // ... existing events 'custom.event': this.onCustomEvent,};Sending Events to Server
Section titled “Sending Events to Server”For bi-directional communication:
class ActionCableConnector extends BaseActionCableConnector { sendTypingStatus(conversationId: number, isTyping: boolean) { if (this.subscription) { this.subscription.perform('typing', { conversation_id: conversationId, is_typing: isTyping, }); } }}Connection Health Monitoring
Section titled “Connection Health Monitoring”class ActionCableConnector extends BaseActionCableConnector { private heartbeatTimer: NodeJS.Timeout | null = null; private missedHeartbeats = 0;
private startHeartbeat() { this.heartbeatTimer = setInterval(() => { if (this.subscription) { this.subscription.perform('heartbeat'); this.missedHeartbeats++;
if (this.missedHeartbeats > 3) { this.reconnect(); } } }, 30000); }
private onHeartbeatResponse = () => { this.missedHeartbeats = 0; };
private reconnect() { this.disconnect(); this.createConnection(); }}Further Reading
Section titled “Further Reading”- State Management - Redux integration
- Messaging Feature - Message handling
- Push Notifications - Offline notifications
- Action Cable Documentation