Skip to content

Commit 9142335

Browse files
authored
Merge pull request #1169 from trheyi/main
Enhance messenger service with message handler registration and webho…
2 parents 6d749cd + ffc666c commit 9142335

File tree

4 files changed

+659
-4
lines changed

4 files changed

+659
-4
lines changed

messenger/messenger.go

Lines changed: 174 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77
"path/filepath"
8+
"reflect"
89
"regexp"
910
"strings"
1011
"sync"
@@ -35,6 +36,7 @@ type Service struct {
3536
channels map[string]types.Channel
3637
defaults map[string]string
3738
receivers map[string]context.CancelFunc // Active mail receivers by provider name
39+
messageHandlers []types.MessageHandler // Registered message handlers for OnReceive
3840
mutex sync.RWMutex
3941
}
4042

@@ -106,6 +108,7 @@ func Load(cfg config.Config) error {
106108
channels: make(map[string]types.Channel),
107109
defaults: config.Defaults,
108110
receivers: make(map[string]context.CancelFunc),
111+
messageHandlers: make([]types.MessageHandler, 0),
109112
}
110113

111114
// Set global instance
@@ -605,8 +608,12 @@ func (m *Service) startMailReceivers() {
605608
err := mp.StartMailReceiver(ctx, func(msg *types.Message) error {
606609
log.Info("[Messenger] Received email via %s: Subject=%s, From=%s", providerName, msg.Subject, msg.From)
607610

608-
// Here you can add custom message processing logic
609-
// For now, just log the received message
611+
// Trigger OnReceive handlers for the received message
612+
if err := m.triggerOnReceiveHandlers(ctx, msg); err != nil {
613+
log.Error("[Messenger] Failed to trigger OnReceive handlers: %v", err)
614+
return err
615+
}
616+
610617
return nil
611618
})
612619

@@ -668,3 +675,168 @@ func (m *Service) GetActiveReceivers() []string {
668675
}
669676
return receivers
670677
}
678+
679+
// OnReceive registers a message handler for received messages
680+
// Multiple handlers can be registered and will be called in order
681+
func (m *Service) OnReceive(handler types.MessageHandler) error {
682+
if handler == nil {
683+
return fmt.Errorf("handler cannot be nil")
684+
}
685+
686+
m.mutex.Lock()
687+
defer m.mutex.Unlock()
688+
689+
m.messageHandlers = append(m.messageHandlers, handler)
690+
log.Info("[Messenger] Registered new message handler (total: %d)", len(m.messageHandlers))
691+
return nil
692+
}
693+
694+
// RemoveReceiveHandler removes a previously registered message handler
695+
func (m *Service) RemoveReceiveHandler(handler types.MessageHandler) error {
696+
if handler == nil {
697+
return fmt.Errorf("handler cannot be nil")
698+
}
699+
700+
m.mutex.Lock()
701+
defer m.mutex.Unlock()
702+
703+
// Find and remove the handler by comparing function pointers
704+
handlerPtr := reflect.ValueOf(handler).Pointer()
705+
for i, existingHandler := range m.messageHandlers {
706+
if reflect.ValueOf(existingHandler).Pointer() == handlerPtr {
707+
// Remove handler at index i
708+
m.messageHandlers = append(m.messageHandlers[:i], m.messageHandlers[i+1:]...)
709+
log.Info("[Messenger] Removed message handler (remaining: %d)", len(m.messageHandlers))
710+
return nil
711+
}
712+
}
713+
714+
return fmt.Errorf("handler not found")
715+
}
716+
717+
// TriggerWebhook processes incoming webhook data and triggers OnReceive handlers
718+
// This is used by OPENAPI endpoints to handle incoming messages
719+
func (m *Service) TriggerWebhook(ctx context.Context, providerName string, data map[string]interface{}) error {
720+
// Get the provider to process the webhook data
721+
provider, exists := m.providers[providerName]
722+
if !exists {
723+
return fmt.Errorf("provider not found: %s", providerName)
724+
}
725+
726+
// First, let the provider process the webhook data
727+
// This may convert webhook data into a standardized message format
728+
err := provider.Receive(ctx, data)
729+
if err != nil {
730+
log.Warn("[Messenger] Provider %s failed to process webhook data: %v", providerName, err)
731+
// Continue to trigger handlers even if provider processing fails
732+
}
733+
734+
// Try to convert webhook data to a Message for OnReceive handlers
735+
message, err := m.convertWebhookToMessage(providerName, data)
736+
if err != nil {
737+
log.Warn("[Messenger] Failed to convert webhook data to message: %v", err)
738+
return err
739+
}
740+
741+
// Trigger all registered OnReceive handlers
742+
return m.triggerOnReceiveHandlers(ctx, message)
743+
}
744+
745+
// convertWebhookToMessage attempts to convert webhook data to a standardized Message
746+
func (m *Service) convertWebhookToMessage(providerName string, data map[string]interface{}) (*types.Message, error) {
747+
message := &types.Message{
748+
Metadata: make(map[string]interface{}),
749+
}
750+
751+
// Add provider information
752+
message.Metadata["provider"] = providerName
753+
message.Metadata["webhook_data"] = data
754+
755+
// Try to extract common fields from webhook data
756+
if subject, ok := data["subject"].(string); ok {
757+
message.Subject = subject
758+
}
759+
if from, ok := data["from"].(string); ok {
760+
message.From = from
761+
}
762+
if body, ok := data["body"].(string); ok {
763+
message.Body = body
764+
}
765+
if html, ok := data["html"].(string); ok {
766+
message.HTML = html
767+
}
768+
769+
// Handle "to" field which might be string or array
770+
if to, ok := data["to"]; ok {
771+
switch v := to.(type) {
772+
case string:
773+
message.To = []string{v}
774+
case []string:
775+
message.To = v
776+
case []interface{}:
777+
for _, item := range v {
778+
if str, ok := item.(string); ok {
779+
message.To = append(message.To, str)
780+
}
781+
}
782+
}
783+
}
784+
785+
// Determine message type based on provider or data
786+
if msgType, ok := data["type"].(string); ok {
787+
message.Type = types.MessageType(strings.ToLower(msgType))
788+
} else {
789+
// Default based on provider type
790+
provider, exists := m.providers[providerName]
791+
if exists {
792+
switch strings.ToLower(provider.GetType()) {
793+
case "mailer", "smtp", "mailgun":
794+
message.Type = types.MessageTypeEmail
795+
case "twilio":
796+
// Could be SMS, WhatsApp, or Email - try to determine from data
797+
if phone, ok := data["phone"].(string); ok && phone != "" {
798+
message.Type = types.MessageTypeSMS
799+
} else if whatsapp, ok := data["whatsapp"].(string); ok && whatsapp != "" {
800+
message.Type = types.MessageTypeWhatsApp
801+
} else {
802+
message.Type = types.MessageTypeEmail
803+
}
804+
default:
805+
message.Type = types.MessageTypeEmail // Default fallback
806+
}
807+
}
808+
}
809+
810+
return message, nil
811+
}
812+
813+
// triggerOnReceiveHandlers calls all registered OnReceive handlers
814+
func (m *Service) triggerOnReceiveHandlers(ctx context.Context, message *types.Message) error {
815+
m.mutex.RLock()
816+
handlers := make([]types.MessageHandler, len(m.messageHandlers))
817+
copy(handlers, m.messageHandlers)
818+
m.mutex.RUnlock()
819+
820+
if len(handlers) == 0 {
821+
log.Debug("[Messenger] No OnReceive handlers registered")
822+
return nil
823+
}
824+
825+
log.Info("[Messenger] Triggering %d OnReceive handlers for message: %s", len(handlers), message.Subject)
826+
827+
var errors []string
828+
for i, handler := range handlers {
829+
err := handler(ctx, message)
830+
if err != nil {
831+
errMsg := fmt.Sprintf("handler %d failed: %v", i, err)
832+
errors = append(errors, errMsg)
833+
log.Error("[Messenger] %s", errMsg)
834+
}
835+
}
836+
837+
if len(errors) > 0 {
838+
return fmt.Errorf("some OnReceive handlers failed: %s", strings.Join(errors, "; "))
839+
}
840+
841+
return nil
842+
}

0 commit comments

Comments
 (0)