C++观察者模式与事件回调结合使用(观察者.回调.模式.事件...)

wufei123 发布于 2025-09-11 阅读(1)
结合观察者模式与事件回调可构建灵活解耦的事件系统,通过定义事件类型、创建发布者与观察者、注册回调函数及触发事件实现;为避免循环依赖,可采用事件分级、过滤、依赖注入等策略;在多线程环境下,需使用线程安全数据结构、事件队列和锁机制保障并发安全;Lambda表达式可简化回调注册,提升代码简洁性与可读性。

c++观察者模式与事件回调结合使用

C++观察者模式和事件回调结合使用,可以构建灵活且解耦的事件处理系统。观察者模式定义了对象之间的一对多依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都会得到通知并自动更新。事件回调则是一种更直接的函数调用机制,允许对象在特定事件发生时执行预定义的回调函数。结合使用这两种模式,既能实现观察者模式的广播通知,又能利用事件回调的精确控制,从而实现更细粒度的事件处理。

解决方案

将观察者模式与事件回调结合,通常涉及以下几个步骤:

  1. 定义事件类型: 首先,定义需要通知的事件类型。这可以是一个枚举类,也可以是字符串常量。

  2. 创建事件发布者(Subject): 事件发布者维护一个观察者列表,并提供注册和取消注册观察者的方法。此外,发布者还需要提供触发特定事件的方法,该方法会遍历观察者列表,并调用每个观察者注册的、与该事件类型关联的回调函数。

  3. 创建观察者(Observer): 观察者实现一个或多个回调函数,用于处理来自发布者的事件通知。观察者需要向发布者注册,指定其感兴趣的事件类型以及对应的回调函数。

  4. 事件回调函数注册: 观察者通过发布者提供的注册方法,将事件类型和回调函数关联起来。

  5. 事件触发: 当发布者的状态发生改变,需要通知观察者时,它会调用相应的事件触发方法。该方法会遍历观察者列表,找到注册了该事件类型的观察者,并调用它们的回调函数。

#include <iostream>
#include <vector>
#include <map>
#include <functional>

// 事件类型枚举
enum class EventType {
  DATA_READY,
  DATA_PROCESSED,
  ERROR
};

// 观察者接口 (可选,如果每个观察者需要实现相同的接口)
class Observer {
public:
  virtual void onEvent(EventType type, const std::string& data) = 0;
};

// 事件发布者
class Subject {
public:
  using Callback = std::function<void(const std::string&)>;

  void registerObserver(EventType type, Callback callback) {
    observers_[type].push_back(callback);
  }

  void unregisterObserver(EventType type, Callback callback) {
    //  简化:这里只移除第一个匹配的回调函数。实际使用中可能需要更复杂的逻辑
    auto& callbacks = observers_[type];
    for (auto it = callbacks.begin(); it != callbacks.end(); ++it) {
      if (*it == callback) {
        callbacks.erase(it);
        break;
      }
    }
  }

  void notify(EventType type, const std::string& data) {
    for (const auto& callback : observers_[type]) {
      callback(data);
    }
  }

private:
  std::map<EventType, std::vector<Callback>> observers_;
};

// 具体观察者
class DataProcessor {
public:
  DataProcessor(Subject& subject) : subject_(subject) {
    subject_.registerObserver(EventType::DATA_READY,
                             std::bind(&DataProcessor::onDataReady, this, std::placeholders::_1));
    subject_.registerObserver(EventType::ERROR,
                             std::bind(&DataProcessor::onError, this, std::placeholders::_1));
  }

  ~DataProcessor() {
    subject_.unregisterObserver(EventType::DATA_READY,
                               std::bind(&DataProcessor::onDataReady, this, std::placeholders::_1));
    subject_.unregisterObserver(EventType::ERROR,
                               std::bind(&DataProcessor::onError, this, std::placeholders::_1));
  }

  void onDataReady(const std::string& data) {
    std::cout << "DataProcessor received data: " << data << std::endl;
    //  模拟数据处理
    std::cout << "Processing data..." << std::endl;
    //  处理后触发另一个事件
    subject_.notify(EventType::DATA_PROCESSED, "Processed: " + data);
  }

  void onError(const std::string& errorMessage) {
    std::cerr << "DataProcessor received error: " << errorMessage << std::endl;
  }

private:
  Subject& subject_;
};

class DataConsumer {
public:
    DataConsumer(Subject& subject) : subject_(subject) {
        subject_.registerObserver(EventType::DATA_PROCESSED,
                                 std::bind(&DataConsumer::onDataProcessed, this, std::placeholders::_1));
    }

    ~DataConsumer() {
        subject_.unregisterObserver(EventType::DATA_PROCESSED,
                                   std::bind(&DataConsumer::onDataProcessed, this, std::placeholders::_1));
    }

    void onDataProcessed(const std::string& data) {
        std::cout << "DataConsumer received processed data: " << data << std::endl;
    }

private:
    Subject& subject_;
};


int main() {
  Subject subject;
  DataProcessor processor(subject);
  DataConsumer consumer(subject);

  // 模拟数据准备好
  subject.notify(EventType::DATA_READY, "Raw data from sensor");

  // 模拟发生错误
  subject.notify(EventType::ERROR, "File not found");

  return 0;
}
如何避免观察者模式中的循环依赖?

循环依赖是指观察者A依赖于观察者B,而观察者B又依赖于观察者A的情况。这会导致无限循环的事件触发,最终导致程序崩溃。

避免循环依赖的几种策略:

  • 事件分级: 将事件划分为不同的级别,确保低级别事件的观察者不会触发高级别事件。例如,可以将数据处理事件分为“数据校验”、“数据转换”、“数据存储”等级别,确保数据校验事件的观察者不会触发数据转换或数据存储事件。

  • 事件过滤: 在观察者中添加事件过滤机制,只处理特定条件的事件。例如,观察者可以根据事件的来源或事件携带的数据,决定是否处理该事件。

  • 依赖注入: 使用依赖注入容器管理对象之间的依赖关系,可以更容易地检测和解决循环依赖。

  • 避免双向关联: 尽量避免观察者和发布者之间的双向关联。如果观察者需要向发布者发送消息,可以使用回调函数或事件总线等机制。

    PIA PIA

    全面的AI聚合平台,一站式访问所有顶级AI模型

    PIA226 查看详情 PIA
  • 状态管理: 使用集中的状态管理系统,例如Redux或状态机,可以避免对象之间的直接依赖,从而减少循环依赖的风险。

  • 临时取消注册: 在处理事件时,临时取消观察者的注册,防止事件再次触发自身。处理完毕后再重新注册。这种方法需要谨慎使用,确保在取消注册期间不会丢失重要的事件通知。

如何在多线程环境中使用观察者模式和事件回调?

在多线程环境中使用观察者模式和事件回调需要特别注意线程安全问题。

  • 线程安全的数据结构: 观察者列表必须使用线程安全的数据结构,例如

    std::mutex
    保护的
    std::vector
    std::shared_mutex
    保护的
    std::map
  • 避免在回调函数中执行耗时操作: 回调函数应该尽可能简单快速,避免执行耗时操作,以免阻塞事件发布线程。如果需要执行耗时操作,可以将任务提交到线程池中异步执行。

  • 使用线程安全的事件队列: 可以使用线程安全的事件队列来缓冲事件,然后由单独的线程从队列中取出事件并通知观察者。这样可以避免事件发布线程被阻塞。

  • 避免死锁: 在回调函数中访问共享资源时,需要使用锁机制,但要避免死锁的发生。可以使用

    std::lock_guard
    std::unique_lock
    等RAII锁来自动管理锁的释放。
  • 原子操作: 对于简单的状态更新,可以使用原子操作,例如

    std::atomic
    ,来避免使用锁。
  • 读写锁: 如果读操作远多于写操作,可以使用读写锁(

    std::shared_mutex
    )来提高并发性能。读写锁允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。
  • 避免在回调函数中修改观察者列表: 在回调函数中修改观察者列表可能会导致并发问题。应该避免这种情况,或者使用线程安全的数据结构和锁机制来保护观察者列表。

#include <iostream>
#include <vector>
#include <map>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

// 线程安全的事件队列
template <typename T>
class ThreadSafeQueue {
public:
  void enqueue(T item) {
    std::lock_guard<std::mutex> lock(mutex_);
    queue_.push(item);
    condition_.notify_one();
  }

  T dequeue() {
    std::unique_lock<std::mutex> lock(mutex_);
    condition_.wait(lock, [this] { return !queue_.empty(); });
    T item = queue_.front();
    queue_.pop();
    return item;
  }

private:
  std::queue<T> queue_;
  std::mutex mutex_;
  std::condition_variable condition_;
};


// 线程安全的Subject
class ThreadSafeSubject {
public:
    using Callback = std::function<void(const std::string&)>;
    using EventQueueItem = std::pair<EventType, std::string>;

    ThreadSafeSubject() : event_queue_(), is_running_(true) {
        // 启动事件处理线程
        event_thread_ = std::thread([this]() {
            while (is_running_) {
                try {
                    EventQueueItem event = event_queue_.dequeue();
                    notifyObservers(event.first, event.second);
                } catch (const std::exception& e) {
                    std::cerr << "Exception in event thread: " << e.what() << std::endl;
                }
            }
        });
    }

    ~ThreadSafeSubject() {
        is_running_ = false;
        event_queue_.enqueue({EventType::ERROR, "Shutdown"}); // Unblock queue if waiting
        if (event_thread_.joinable()) {
            event_thread_.join();
        }
    }


    void registerObserver(EventType type, Callback callback) {
        std::lock_guard<std::mutex> lock(observers_mutex_);
        observers_[type].push_back(callback);
    }

    void unregisterObserver(EventType type, Callback callback) {
        std::lock_guard<std::mutex> lock(observers_mutex_);
        auto& callbacks = observers_[type];
        for (auto it = callbacks.begin(); it != callbacks.end(); ++it) {
            if (*it == callback) {
                callbacks.erase(it);
                break;
            }
        }
    }

    void notify(EventType type, const std::string& data) {
        event_queue_.enqueue({type, data}); // 将事件放入队列
    }

private:
    void notifyObservers(EventType type, const std::string& data) {
        std::lock_guard<std::mutex> lock(observers_mutex_);
        for (const auto& callback : observers_[type]) {
            callback(data);
        }
    }

    std::map<EventType, std::vector<Callback>> observers_;
    std::mutex observers_mutex_;
    ThreadSafeQueue<EventQueueItem> event_queue_;
    std::thread event_thread_;
    std::atomic<bool> is_running_;
};

// 线程安全的观察者示例
class ThreadSafeDataProcessor {
public:
    ThreadSafeDataProcessor(ThreadSafeSubject& subject) : subject_(subject) {
        subject_.registerObserver(EventType::DATA_READY,
                                 std::bind(&ThreadSafeDataProcessor::onDataReady, this, std::placeholders::_1));
        subject_.registerObserver(EventType::ERROR,
                                 std::bind(&ThreadSafeDataProcessor::onError, this, std::placeholders::_1));
    }

    ~ThreadSafeDataProcessor() {
        subject_.unregisterObserver(EventType::DATA_READY,
                                   std::bind(&ThreadSafeDataProcessor::onDataReady, this, std::placeholders::_1));
        subject_.unregisterObserver(EventType::ERROR,
                                   std::bind(&ThreadSafeDataProcessor::onError, this, std::placeholders::_1));
    }

    void onDataReady(const std::string& data) {
        std::cout << "Thread " << std::this_thread::get_id() << ": DataProcessor received data: " << data << std::endl;
        // 模拟耗时的数据处理
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::cout << "Thread " << std::this_thread::get_id() << ": Processing data..." << std::endl;
        subject_.notify(EventType::DATA_PROCESSED, "Processed: " + data);
    }

    void onError(const std::string& errorMessage) {
        std::cerr << "Thread " << std::this_thread::get_id() << ": DataProcessor received error: " << errorMessage << std::endl;
    }

private:
    ThreadSafeSubject& subject_;
};

class ThreadSafeDataConsumer {
public:
    ThreadSafeDataConsumer(ThreadSafeSubject& subject) : subject_(subject) {
        subject_.registerObserver(EventType::DATA_PROCESSED,
                                 std::bind(&ThreadSafeDataConsumer::onDataProcessed, this, std::placeholders::_1));
    }

    ~ThreadSafeDataConsumer() {
        subject_.unregisterObserver(EventType::DATA_PROCESSED,
                                   std::bind(&ThreadSafeDataConsumer::onDataProcessed, this, std::placeholders::_1));
    }

    void onDataProcessed(const std::string& data) {
        std::cout << "Thread " << std::this_thread::get_id() << ": DataConsumer received processed data: " << data << std::endl;
    }

private:
    ThreadSafeSubject& subject_;
};


int main() {
    ThreadSafeSubject subject;
    ThreadSafeDataProcessor processor(subject);
    ThreadSafeDataConsumer consumer(subject);

    // 模拟多个线程同时产生数据
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i) {
        threads.emplace_back([&subject, i]() {
            std::this_thread::sleep_for(std::chrono::milliseconds(50 * i)); // 错开时间
            subject.notify(EventType::DATA_READY, "Raw data from sensor " + std::to_string(i));
        });
    }

    // 等待所有线程完成
    for (auto& thread : threads) {
        thread.join();
    }

    // 模拟发生错误
    subject.notify(EventType::ERROR, "File not found");

    // 等待一段时间,确保所有事件处理完成
    std::this_thread::sleep_for(std::chrono::seconds(1));

    return 0;
}
如何使用Lambda表达式简化事件回调的注册?

Lambda表达式可以简化事件回调的注册,避免创建单独的回调函数。

#include <iostream>
#include <vector>
#include <map>
#include <functional>

// 事件类型枚举
enum class EventType {
  BUTTON_CLICKED,
  TEXT_CHANGED
};

// 事件发布者
class Button {
public:
  using Callback = std::function<void(const std::string&)>;

  void registerObserver(EventType type, Callback callback) {
    observers_[type].push_back(callback);
  }

  void click() {
    notify(EventType::BUTTON_CLICKED, "Button clicked!");
  }

  void setText(const std::string& text) {
    text_ = text;
    notify(EventType::TEXT_CHANGED, text_);
  }

private:
  void notify(EventType type, const std::string& data) {
    for (const auto& callback : observers_[type]) {
      callback(data);
    }
  }

  std::map<EventType, std::vector<Callback>> observers_;
  std::string text_;
};

int main() {
  Button button;

  // 使用Lambda表达式注册回调函数
  button.registerObserver(EventType::BUTTON_CLICKED, [](const std::string& data) {
    std::cout << "Button click event: " << data << std::endl;
  });

  button.registerObserver(EventType::TEXT_CHANGED, [](const std::string& text) {
    std::cout << "Text changed event: " << text << std::endl;
  });

  button.click();
  button.setText("Hello, world!");

  return 0;
}

使用Lambda表达式可以使代码更简洁易懂,尤其是在回调函数逻辑比较简单的情况下。它避免了定义单独的函数,并将回调函数的定义直接嵌入到注册代码中。

以上就是C++观察者模式与事件回调结合使用的详细内容,更多请关注知识资源分享宝库其它相关文章!

相关标签: c++ ai ios 字符串常量 red 常量 字符串常量 回调函数 字符串 循环 Lambda 数据结构 线程 多线程 map 并发 对象 事件 异步 大家都在看: C++如何使用模板实现迭代器类 C++如何处理复合对象中的嵌套元素 C++内存模型与编译器优化理解 C++如何使用ofstream和ifstream组合操作文件 C++循环与算法优化提高程序执行效率

标签:  观察者 回调 模式 

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。