fix: Try to fix the non-platform thread call error. (#1795)

* fix: Try to fix the non-platform thread call error.

* using hidden window for event post.

* fix.

* fix.
This commit is contained in:
CloudWebRTC
2025-03-24 11:38:30 +08:00
committed by GitHub
parent 9b48495c9c
commit 1c2ff6f497
21 changed files with 296 additions and 67 deletions

View File

@ -12,8 +12,10 @@
#include <list>
#include <memory>
#include <string>
#include <mutex>
#include <optional>
#include <queue>
#include <string>
typedef flutter::EncodableValue EncodableValue;
typedef flutter::EncodableMap EncodableMap;
@ -27,6 +29,8 @@ typedef flutter::EventSink<EncodableValue> EventSink;
typedef flutter::MethodCall<EncodableValue> MethodCall;
typedef flutter::MethodResult<EncodableValue> MethodResult;
class TaskRunner;
// foo.StringValue() becomes std::get<std::string>(foo)
// foo.IsString() becomes std::holds_alternative<std::string>(foo)
@ -90,7 +94,8 @@ inline double findDouble(const EncodableMap& map, const std::string& key) {
return 0.0;
}
inline std::optional<double> maybeFindDouble(const EncodableMap& map, const std::string& key) {
inline std::optional<double> maybeFindDouble(const EncodableMap& map,
const std::string& key) {
auto it = map.find(EncodableValue(key));
if (it != map.end() && TypeIs<double>(it->second))
return GetValue<double>(it->second);
@ -171,6 +176,7 @@ class EventChannelProxy {
public:
static std::unique_ptr<EventChannelProxy> Create(
BinaryMessenger* messenger,
TaskRunner* task_runner,
const std::string& channelName);
virtual ~EventChannelProxy() = default;

View File

@ -10,6 +10,7 @@ class FlutterRTCDataChannelObserver : public RTCDataChannelObserver {
public:
FlutterRTCDataChannelObserver(scoped_refptr<RTCDataChannel> data_channel,
BinaryMessenger* messenger,
TaskRunner* task_runner,
const std::string& channel_name);
virtual ~FlutterRTCDataChannelObserver();

View File

@ -10,8 +10,8 @@ namespace flutter_webrtc_plugin {
class FlutterFrameCryptorObserver : public libwebrtc::RTCFrameCryptorObserver {
public:
FlutterFrameCryptorObserver(BinaryMessenger* messenger,const std::string& channelName)
: event_channel_(EventChannelProxy::Create(messenger, channelName)) {}
FlutterFrameCryptorObserver(BinaryMessenger* messenger, TaskRunner* task_runner, const std::string& channelName)
: event_channel_(EventChannelProxy::Create(messenger, task_runner, channelName)) {}
void OnFrameCryptionStateChanged(
const string participant_id,
libwebrtc::RTCFrameCryptionState state);

View File

@ -11,6 +11,7 @@ class FlutterPeerConnectionObserver : public RTCPeerConnectionObserver {
FlutterPeerConnectionObserver(FlutterWebRTCBase* base,
scoped_refptr<RTCPeerConnection> peerconnection,
BinaryMessenger* messenger,
TaskRunner* task_runner,
const std::string& channel_name,
std::string& peerConnectionId);

View File

@ -22,6 +22,7 @@ class FlutterVideoRenderer
void initialize(TextureRegistrar* registrar,
BinaryMessenger* messenger,
TaskRunner* task_runner,
std::unique_ptr<flutter::TextureVariant> texture,
int64_t texture_id);

View File

@ -21,6 +21,8 @@ class FlutterWebRTCPlugin : public flutter::Plugin {
virtual BinaryMessenger* messenger() = 0;
virtual TextureRegistrar* textures() = 0;
virtual TaskRunner* task_runner() = 0;
};
class FlutterWebRTC : public FlutterWebRTCBase,

View File

@ -43,7 +43,7 @@ class FlutterWebRTCBase {
enum ParseConstraintType { kMandatory, kOptional };
public:
FlutterWebRTCBase(BinaryMessenger* messenger, TextureRegistrar* textures);
FlutterWebRTCBase(BinaryMessenger* messenger, TextureRegistrar* textures, TaskRunner* task_runner);
~FlutterWebRTCBase();
std::string GenerateUUID();
@ -122,6 +122,7 @@ class FlutterWebRTCBase {
protected:
BinaryMessenger* messenger_;
TaskRunner *task_runner_;
TextureRegistrar* textures_;
std::unique_ptr<EventChannelProxy> event_channel_;
};

View File

@ -0,0 +1,17 @@
// Copyright 2024 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef PACKAGES_FLUTTER_WEBRTC_TASK_RUNNER_H_
#define PACKAGES_FLUTTER_WEBRTC_TASK_RUNNER_H_
#include <functional>
using TaskClosure = std::function<void()>;
class TaskRunner {
public:
virtual void EnqueueTask(TaskClosure task) = 0;
virtual ~TaskRunner() = default;
};
#endif // PACKAGES_FLUTTER_WEBRTC_TASK_RUNNER_H_

View File

@ -1,4 +1,7 @@
#include "flutter_common.h"
#include "task_runner.h"
#include <memory>
class MethodCallProxyImpl : public MethodCallProxy {
public:
@ -66,56 +69,75 @@ std::unique_ptr<MethodResultProxy> MethodResultProxy::Create(
}
class EventChannelProxyImpl : public EventChannelProxy {
public:
EventChannelProxyImpl(BinaryMessenger* messenger,
const std::string& channelName)
: channel_(std::make_unique<EventChannel>(
messenger,
channelName,
&flutter::StandardMethodCodec::GetInstance())) {
auto handler = std::make_unique<
flutter::StreamHandlerFunctions<EncodableValue>>(
[&](const EncodableValue* arguments,
std::unique_ptr<flutter::EventSink<EncodableValue>>&& events)
-> std::unique_ptr<flutter::StreamHandlerError<EncodableValue>> {
sink_ = std::move(events);
for (auto& event : event_queue_) {
sink_->Success(event);
}
event_queue_.clear();
on_listen_called_ = true;
return nullptr;
},
[&](const EncodableValue* arguments)
-> std::unique_ptr<flutter::StreamHandlerError<EncodableValue>> {
on_listen_called_ = false;
return nullptr;
});
public:
EventChannelProxyImpl(BinaryMessenger* messenger,
TaskRunner* task_runner,
const std::string& channelName)
: channel_(std::make_unique<EventChannel>(
messenger,
channelName,
&flutter::StandardMethodCodec::GetInstance())),
task_runner_(task_runner) {
auto handler = std::make_unique<
flutter::StreamHandlerFunctions<EncodableValue>>(
[&](const EncodableValue* arguments,
std::unique_ptr<flutter::EventSink<EncodableValue>>&& events)
-> std::unique_ptr<flutter::StreamHandlerError<EncodableValue>> {
sink_ = std::move(events);
std::weak_ptr<EventSink> weak_sink = sink_;
for (auto& event : event_queue_) {
PostEvent(event);
}
event_queue_.clear();
on_listen_called_ = true;
return nullptr;
},
[&](const EncodableValue* arguments)
-> std::unique_ptr<flutter::StreamHandlerError<EncodableValue>> {
on_listen_called_ = false;
return nullptr;
});
channel_->SetStreamHandler(std::move(handler));
}
virtual ~EventChannelProxyImpl() {}
void Success(const EncodableValue& event, bool cache_event = true) override {
if (on_listen_called_) {
PostEvent(event);
} else {
if (cache_event) {
event_queue_.push_back(event);
}
}
}
channel_->SetStreamHandler(std::move(handler));
}
virtual ~EventChannelProxyImpl() {}
void Success(const EncodableValue& event, bool cache_event = true) override {
if (on_listen_called_) {
void PostEvent(const EncodableValue& event) {
if(task_runner_) {
std::weak_ptr<EventSink> weak_sink = sink_;
task_runner_->EnqueueTask([weak_sink, event]() {
auto sink = weak_sink.lock();
if (sink) {
sink->Success(event);
}
});
} else {
sink_->Success(event);
} else {
if (cache_event) {
event_queue_.push_back(event);
}
}
}
private:
std::unique_ptr<EventChannel> channel_;
std::unique_ptr<EventSink> sink_;
std::list<EncodableValue> event_queue_;
bool on_listen_called_ = false;
};
}
}
private:
std::unique_ptr<EventChannel> channel_;
std::shared_ptr<flutter::EventSink<flutter::EncodableValue>> sink_;
std::list<EncodableValue> event_queue_;
bool on_listen_called_ = false;
TaskRunner* task_runner_;
};
std::unique_ptr<EventChannelProxy> EventChannelProxy::Create(
BinaryMessenger* messenger,
TaskRunner* task_runner,
const std::string& channelName) {
return std::make_unique<EventChannelProxyImpl>(messenger, channelName);
}
return std::make_unique<EventChannelProxyImpl>(messenger, task_runner, channelName);
}

View File

@ -7,8 +7,9 @@ namespace flutter_webrtc_plugin {
FlutterRTCDataChannelObserver::FlutterRTCDataChannelObserver(
scoped_refptr<RTCDataChannel> data_channel,
BinaryMessenger* messenger,
TaskRunner* task_runner,
const std::string& channelName)
: event_channel_(EventChannelProxy::Create(messenger, channelName)),
: event_channel_(EventChannelProxy::Create(messenger, task_runner, channelName)),
data_channel_(data_channel) {
data_channel_->RegisterObserver(this);
}
@ -53,7 +54,7 @@ void FlutterDataChannel::CreateDataChannel(
"FlutterWebRTC/dataChannelEvent" + peerConnectionId + uuid;
std::unique_ptr<FlutterRTCDataChannelObserver> observer(
new FlutterRTCDataChannelObserver(data_channel, base_->messenger_,
new FlutterRTCDataChannelObserver(data_channel, base_->messenger_, base_->task_runner_,
event_channel));
base_->lock();

View File

@ -166,7 +166,7 @@ void FlutterFrameCryptor::FrameCryptorFactoryCreateFrameCryptor(
keyProvider);
std::string event_channel = "FlutterWebRTC/frameCryptorEvent" + uuid;
scoped_refptr<FlutterFrameCryptorObserver> observer(new RefCountedObject<FlutterFrameCryptorObserver>(base_->messenger_, event_channel));
scoped_refptr<FlutterFrameCryptorObserver> observer(new RefCountedObject<FlutterFrameCryptorObserver>(base_->messenger_, base_->task_runner_, event_channel));
frameCryptor->RegisterRTCFrameCryptorObserver(observer);
@ -192,7 +192,7 @@ void FlutterFrameCryptor::FrameCryptorFactoryCreateFrameCryptor(
std::string event_channel = "FlutterWebRTC/frameCryptorEvent" + uuid;
scoped_refptr<FlutterFrameCryptorObserver> observer(new RefCountedObject<FlutterFrameCryptorObserver>(base_->messenger_, event_channel));
scoped_refptr<FlutterFrameCryptorObserver> observer(new RefCountedObject<FlutterFrameCryptorObserver>(base_->messenger_, base_->task_runner_, event_channel));
frameCryptor->RegisterRTCFrameCryptorObserver(observer.get());

View File

@ -338,6 +338,7 @@ void FlutterPeerConnection::CreateRTCPeerConnection(
std::unique_ptr<FlutterPeerConnectionObserver> observer(
new FlutterPeerConnectionObserver(base_, pc, base_->messenger_,
base_->task_runner_,
event_channel, uuid));
base_->peerconnection_observers_[uuid] = std::move(observer);
@ -1118,9 +1119,10 @@ FlutterPeerConnectionObserver::FlutterPeerConnectionObserver(
FlutterWebRTCBase* base,
scoped_refptr<RTCPeerConnection> peerconnection,
BinaryMessenger* messenger,
TaskRunner* task_runner,
const std::string& channel_name,
std::string& peerConnectionId)
: event_channel_(EventChannelProxy::Create(messenger, channel_name)),
: event_channel_(EventChannelProxy::Create(messenger, task_runner, channel_name)),
peerconnection_(peerconnection),
base_(base),
id_(peerConnectionId) {
@ -1326,6 +1328,7 @@ void FlutterPeerConnectionObserver::OnDataChannel(
std::unique_ptr<FlutterRTCDataChannelObserver> observer(
new FlutterRTCDataChannelObserver(data_channel, base_->messenger_,
base_->task_runner_,
event_channel));
base_->lock();

View File

@ -7,6 +7,7 @@ FlutterVideoRenderer::~FlutterVideoRenderer() {}
void FlutterVideoRenderer::initialize(
TextureRegistrar* registrar,
BinaryMessenger* messenger,
TaskRunner* task_runner,
std::unique_ptr<flutter::TextureVariant> texture,
int64_t trxture_id) {
registrar_ = registrar;
@ -14,7 +15,7 @@ void FlutterVideoRenderer::initialize(
texture_id_ = trxture_id;
std::string channel_name =
"FlutterWebRTC/Texture" + std::to_string(texture_id_);
event_channel_ = EventChannelProxy::Create(messenger, channel_name);
event_channel_ = EventChannelProxy::Create(messenger, task_runner, channel_name);
}
const FlutterDesktopPixelBuffer* FlutterVideoRenderer::CopyPixelBuffer(
@ -121,7 +122,7 @@ void FlutterVideoRendererManager::CreateVideoRendererTexture(
}));
auto texture_id = base_->textures_->RegisterTexture(textureVariant.get());
texture->initialize(base_->textures_, base_->messenger_,
texture->initialize(base_->textures_, base_->messenger_, base_->task_runner_,
std::move(textureVariant), texture_id);
renderers_[texture_id] = texture;
EncodableMap params;

View File

@ -6,7 +6,8 @@ namespace flutter_webrtc_plugin {
FlutterWebRTC::FlutterWebRTC(FlutterWebRTCPlugin* plugin)
: FlutterWebRTCBase::FlutterWebRTCBase(plugin->messenger(),
plugin->textures()),
plugin->textures(),
plugin->task_runner()),
FlutterVideoRendererManager::FlutterVideoRendererManager(this),
FlutterMediaStream::FlutterMediaStream(this),
FlutterPeerConnection::FlutterPeerConnection(this),

View File

@ -8,14 +8,15 @@ namespace flutter_webrtc_plugin {
const char* kEventChannelName = "FlutterWebRTC.Event";
FlutterWebRTCBase::FlutterWebRTCBase(BinaryMessenger* messenger,
TextureRegistrar* textures)
: messenger_(messenger), textures_(textures) {
TextureRegistrar* textures,
TaskRunner *task_runner)
: messenger_(messenger), task_runner_(task_runner), textures_(textures) {
LibWebRTC::Initialize();
factory_ = LibWebRTC::CreateRTCPeerConnectionFactory();
audio_device_ = factory_->GetAudioDevice();
video_device_ = factory_->GetVideoDevice();
desktop_device_ = factory_->GetDesktopDevice();
event_channel_ = EventChannelProxy::Create(messenger_, kEventChannelName);
event_channel_ = EventChannelProxy::Create(messenger_, task_runner_, kEventChannelName);
}
FlutterWebRTCBase::~FlutterWebRTCBase() {

View File

@ -37,6 +37,8 @@ class FlutterWebRTCPluginImpl : public FlutterWebRTCPlugin {
TextureRegistrar* textures() { return textures_; }
TaskRunner* task_runner() { return nullptr; }
private:
// Creates a plugin that communicates on the given channel.
FlutterWebRTCPluginImpl(PluginRegistrar* registrar,

View File

@ -37,6 +37,8 @@ class FlutterWebRTCPluginImpl : public FlutterWebRTCPlugin {
TextureRegistrar* textures() { return textures_; }
TaskRunner* task_runner() { return nullptr; }
private:
// Creates a plugin that communicates on the given channel.
FlutterWebRTCPluginImpl(PluginRegistrar* registrar,

View File

@ -22,6 +22,7 @@ add_library(${PLUGIN_NAME} SHARED
"../common/cpp/src/flutter_webrtc_base.cc"
"../third_party/uuidxx/uuidxx.cc"
"flutter_webrtc_plugin.cc"
"task_runner_windows.cc"
)
include_directories(

View File

@ -2,6 +2,7 @@
#include "flutter_common.h"
#include "flutter_webrtc.h"
#include "task_runner_windows.h"
const char* kChannelName = "FlutterWebRTC.Method";
@ -35,13 +36,16 @@ class FlutterWebRTCPluginImpl : public FlutterWebRTCPlugin {
TextureRegistrar* textures() { return textures_; }
TaskRunner* task_runner() { return task_runner_.get(); }
private:
// Creates a plugin that communicates on the given channel.
FlutterWebRTCPluginImpl(PluginRegistrar* registrar,
std::unique_ptr<MethodChannel> channel)
: channel_(std::move(channel)),
messenger_(registrar->messenger()),
textures_(registrar->texture_registrar()) {
textures_(registrar->texture_registrar()),
task_runner_(std::make_unique<TaskRunnerWindows>()) {
webrtc_ = std::make_unique<FlutterWebRTC>(this);
}
@ -59,6 +63,7 @@ class FlutterWebRTCPluginImpl : public FlutterWebRTCPlugin {
std::unique_ptr<FlutterWebRTC> webrtc_;
BinaryMessenger* messenger_;
TextureRegistrar* textures_;
std::unique_ptr<TaskRunner> task_runner_;
};
} // namespace flutter_webrtc_plugin
@ -66,7 +71,7 @@ class FlutterWebRTCPluginImpl : public FlutterWebRTCPlugin {
void FlutterWebRTCPluginRegisterWithRegistrar(
FlutterDesktopPluginRegistrarRef registrar) {
static auto* plugin_registrar = new flutter::PluginRegistrar(registrar);
flutter_webrtc_plugin::FlutterWebRTCPluginImpl::RegisterWithRegistrar(
plugin_registrar);
static auto* plugin_registrar = new flutter::PluginRegistrar(registrar);
flutter_webrtc_plugin::FlutterWebRTCPluginImpl::RegisterWithRegistrar(
plugin_registrar);
}

View File

@ -0,0 +1,106 @@
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "task_runner_windows.h"
#include <algorithm>
#include <iostream>
namespace flutter_webrtc_plugin {
TaskRunnerWindows::TaskRunnerWindows() {
WNDCLASS window_class = RegisterWindowClass();
window_handle_ =
CreateWindowEx(0, window_class.lpszClassName, L"", 0, 0, 0, 0, 0,
HWND_MESSAGE, nullptr, window_class.hInstance, nullptr);
if (window_handle_) {
SetWindowLongPtr(window_handle_, GWLP_USERDATA,
reinterpret_cast<LONG_PTR>(this));
} else {
auto error = GetLastError();
LPWSTR message = nullptr;
FormatMessageW(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, error, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
reinterpret_cast<LPWSTR>(&message), 0, NULL);
OutputDebugString(message);
LocalFree(message);
}
}
TaskRunnerWindows::~TaskRunnerWindows() {
if (window_handle_) {
DestroyWindow(window_handle_);
window_handle_ = nullptr;
}
UnregisterClass(window_class_name_.c_str(), nullptr);
}
void TaskRunnerWindows::EnqueueTask(TaskClosure task) {
{
std::lock_guard<std::mutex> lock(tasks_mutex_);
tasks_.push(task);
}
if (!PostMessage(window_handle_, WM_NULL, 0, 0)) {
DWORD error_code = GetLastError();
std::cerr << "Failed to post message to main thread; error_code: "
<< error_code << std::endl;
}
}
void TaskRunnerWindows::ProcessTasks() {
// Even though it would usually be sufficient to process only a single task
// whenever we receive the message, if the message queue happens to be full,
// we might not receive a message for each individual task.
for (;;) {
std::lock_guard<std::mutex> lock(tasks_mutex_);
if (tasks_.empty()) break;
TaskClosure task = tasks_.front();
tasks_.pop();
task();
}
}
WNDCLASS TaskRunnerWindows::RegisterWindowClass() {
window_class_name_ = L"FlutterWebRTCWindowsTaskRunnerWindow";
WNDCLASS window_class{};
window_class.hCursor = nullptr;
window_class.lpszClassName = window_class_name_.c_str();
window_class.style = 0;
window_class.cbClsExtra = 0;
window_class.cbWndExtra = 0;
window_class.hInstance = GetModuleHandle(nullptr);
window_class.hIcon = nullptr;
window_class.hbrBackground = 0;
window_class.lpszMenuName = nullptr;
window_class.lpfnWndProc = WndProc;
RegisterClass(&window_class);
return window_class;
}
LRESULT
TaskRunnerWindows::HandleMessage(UINT const message, WPARAM const wparam,
LPARAM const lparam) noexcept {
switch (message) {
case WM_NULL:
ProcessTasks();
return 0;
}
return DefWindowProcW(window_handle_, message, wparam, lparam);
}
LRESULT TaskRunnerWindows::WndProc(HWND const window, UINT const message,
WPARAM const wparam,
LPARAM const lparam) noexcept {
if (auto* that = reinterpret_cast<TaskRunnerWindows*>(
GetWindowLongPtr(window, GWLP_USERDATA))) {
return that->HandleMessage(message, wparam, lparam);
} else {
return DefWindowProc(window, message, wparam, lparam);
}
}
} // namespace flutter_webrtc_plugin

View File

@ -0,0 +1,55 @@
// Copyright 2024 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef PACKAGES_FLUTTER_WEBRTC_WINDOWS_TASK_RUNNER_WINDOW_H_
#define PACKAGES_FLUTTER_WEBRTC_WINDOWS_TASK_RUNNER_WINDOW_H_
#include <windows.h>
#include <chrono>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include "task_runner.h"
namespace flutter_webrtc_plugin {
// Hidden HWND responsible for processing camera tasks on main thread
// Adapted from Flutter Engine, see:
// https://github.com/flutter/flutter/issues/134346#issuecomment-2141023146
// and:
// https://github.com/flutter/engine/blob/d7c0bcfe7a30408b0722c9d47d8b0b1e4cdb9c81/shell/platform/windows/task_runner_window.h
class TaskRunnerWindows : public TaskRunner {
public:
virtual void EnqueueTask(TaskClosure task);
TaskRunnerWindows();
~TaskRunnerWindows();
private:
void ProcessTasks();
WNDCLASS RegisterWindowClass();
LRESULT
HandleMessage(UINT const message, WPARAM const wparam,
LPARAM const lparam) noexcept;
static LRESULT CALLBACK WndProc(HWND const window, UINT const message,
WPARAM const wparam,
LPARAM const lparam) noexcept;
HWND window_handle_;
std::wstring window_class_name_;
std::mutex tasks_mutex_;
std::queue<TaskClosure> tasks_;
// Prevent copying.
TaskRunnerWindows(TaskRunnerWindows const&) = delete;
TaskRunnerWindows& operator=(TaskRunnerWindows const&) = delete;
};
} // namespace flutter_webrtc_plugin
#endif // PACKAGES_FLUTTER_WEBRTC_WINDOWS_TASK_RUNNER_WINDOW_H_