| From 3196a96a408a90f707dff3f31fa3d05d64aeb68a Mon Sep 17 00:00:00 2001 |
| From: Alessandro Decina <alessandro.d@gmail.com> |
| Date: Tue, 13 Oct 2015 12:49:19 +1100 |
| Subject: [PATCH] nicesrc: spin the agent mainloop in a separate thread |
| |
| Don't run the mainloop from the srcpad task, since that can get blocked in the |
| pipeline and cause unnecessary STUN retrasmissions (at best) and completely |
| block the agent (at worst). |
| --- |
| gst/gstnicesrc.c | 158 ++++++++++++++++++++++++++++++++----------------------- |
| gst/gstnicesrc.h | 4 +- |
| 2 files changed, 93 insertions(+), 69 deletions(-) |
| |
| diff --git a/gst/gstnicesrc.c b/gst/gstnicesrc.c |
| index d369e09..eb59fe9 100644 |
| --- a/gst/gstnicesrc.c |
| +++ b/gst/gstnicesrc.c |
| @@ -48,6 +48,14 @@ GST_DEBUG_CATEGORY_STATIC (nicesrc_debug); |
| |
| #define BUFFER_SIZE (65536) |
| |
| +static gboolean |
| +gst_nice_src_start ( |
| + GstBaseSrc *basesrc); |
| + |
| +static gboolean |
| +gst_nice_src_stop ( |
| + GstBaseSrc *basesrc); |
| + |
| static GstFlowReturn |
| gst_nice_src_create ( |
| GstPushSrc *basesrc, |
| @@ -57,10 +65,6 @@ static gboolean |
| gst_nice_src_unlock ( |
| GstBaseSrc *basesrc); |
| |
| -static gboolean |
| -gst_nice_src_unlock_stop ( |
| - GstBaseSrc *basesrc); |
| - |
| static void |
| gst_nice_src_set_property ( |
| GObject *object, |
| @@ -116,8 +120,9 @@ gst_nice_src_class_init (GstNiceSrcClass *klass) |
| gstpushsrc_class->create = GST_DEBUG_FUNCPTR (gst_nice_src_create); |
| |
| gstbasesrc_class = (GstBaseSrcClass *) klass; |
| + gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_nice_src_start); |
| + gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_nice_src_stop); |
| gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_nice_src_unlock); |
| - gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_nice_src_unlock_stop); |
| |
| gobject_class = (GObjectClass *) klass; |
| gobject_class->set_property = gst_nice_src_set_property; |
| @@ -179,9 +184,83 @@ gst_nice_src_init (GstNiceSrc *src) |
| src->component_id = 0; |
| src->mainctx = g_main_context_new (); |
| src->mainloop = g_main_loop_new (src->mainctx, FALSE); |
| - src->unlocked = FALSE; |
| - src->idle_source = NULL; |
| src->outbufs = g_queue_new (); |
| + src->agent_io_thread = NULL; |
| + g_cond_init (&src->outcond); |
| +} |
| + |
| +static gpointer |
| +gst_nice_src_agent_io_thread (gpointer data) |
| +{ |
| + GstNiceSrc *nicesrc = GST_NICE_SRC (data); |
| + |
| + GST_INFO_OBJECT (nicesrc, "starting agent io thread"); |
| + g_main_loop_run (nicesrc->mainloop); |
| + GST_INFO_OBJECT (nicesrc, "exiting agent io thread"); |
| + |
| + return NULL; |
| +} |
| + |
| +static gboolean |
| +main_loop_running_cb (gpointer data) |
| +{ |
| + GstNiceSrc *nicesrc = GST_NICE_SRC (data); |
| + |
| + GST_OBJECT_LOCK (nicesrc); |
| + /* _start() and _stop() could both be waiting for the mainloop to start so we |
| + * need to broadcast */ |
| + g_cond_broadcast (&nicesrc->outcond); |
| + GST_OBJECT_UNLOCK (nicesrc); |
| + |
| + return FALSE; |
| +} |
| + |
| +static gboolean |
| +gst_nice_src_start (GstBaseSrc * basesrc) |
| +{ |
| + GstNiceSrc *nicesrc = GST_NICE_SRC (basesrc); |
| + GSource *source; |
| + gchar *thread_name; |
| + |
| + GST_OBJECT_LOCK (nicesrc); |
| + source = g_idle_source_new (); |
| + g_source_set_callback (source, |
| + (GSourceFunc) main_loop_running_cb, nicesrc, NULL); |
| + g_source_attach (source, nicesrc->mainctx); |
| + g_source_unref (source); |
| + |
| + thread_name = g_strdup_printf ("%s:agent_io", GST_OBJECT_NAME (nicesrc)); |
| + nicesrc->agent_io_thread = g_thread_new (thread_name, gst_nice_src_agent_io_thread, nicesrc); |
| + g_free (thread_name); |
| + /* wait until the agent thread starts spinning the mainloop or _stop() is |
| + * called */ |
| + while (GST_BASE_SRC_IS_STARTING (basesrc) && |
| + !g_main_loop_is_running (nicesrc->mainloop)) |
| + g_cond_wait (&nicesrc->outcond, GST_OBJECT_GET_LOCK (nicesrc)); |
| + GST_OBJECT_UNLOCK (nicesrc); |
| + |
| + return TRUE; |
| +} |
| + |
| +static gboolean |
| +gst_nice_src_stop (GstBaseSrc * basesrc) |
| +{ |
| + GstNiceSrc *nicesrc = GST_NICE_SRC (basesrc); |
| + GThread *agent_io_thread = NULL; |
| + |
| + GST_OBJECT_LOCK (nicesrc); |
| + /* here we wait for the agent thread created in _start() to be scheduled so |
| + * that we don't risk calling _quit() first and then _run() on the mainloop */ |
| + while (!g_main_loop_is_running (nicesrc->mainloop)) |
| + g_cond_wait (&nicesrc->outcond, GST_OBJECT_GET_LOCK (nicesrc)); |
| + g_main_loop_quit (nicesrc->mainloop); |
| + agent_io_thread = nicesrc->agent_io_thread; |
| + nicesrc->agent_io_thread = NULL; |
| + GST_OBJECT_UNLOCK (nicesrc); |
| + |
| + g_thread_join (agent_io_thread); |
| + |
| + return TRUE; |
| } |
| |
| static void |
| @@ -207,62 +286,17 @@ gst_nice_src_read_callback (NiceAgent *agent, |
| #endif |
| GST_OBJECT_LOCK (nicesrc); |
| g_queue_push_tail (nicesrc->outbufs, buffer); |
| - g_main_loop_quit (nicesrc->mainloop); |
| + g_cond_signal (&nicesrc->outcond); |
| GST_OBJECT_UNLOCK (nicesrc); |
| } |
| |
| static gboolean |
| -gst_nice_src_unlock_idler (gpointer data) |
| -{ |
| - GstNiceSrc *nicesrc = GST_NICE_SRC (data); |
| - |
| - GST_OBJECT_LOCK (nicesrc); |
| - if (nicesrc->unlocked) |
| - g_main_loop_quit (nicesrc->mainloop); |
| - |
| - if (nicesrc->idle_source) { |
| - g_source_destroy (nicesrc->idle_source); |
| - g_source_unref (nicesrc->idle_source); |
| - nicesrc->idle_source = NULL; |
| - } |
| - GST_OBJECT_UNLOCK (nicesrc); |
| - |
| - return FALSE; |
| -} |
| - |
| -static gboolean |
| gst_nice_src_unlock (GstBaseSrc *src) |
| { |
| GstNiceSrc *nicesrc = GST_NICE_SRC (src); |
| |
| GST_OBJECT_LOCK (src); |
| - nicesrc->unlocked = TRUE; |
| - |
| - g_main_loop_quit (nicesrc->mainloop); |
| - |
| - if (!nicesrc->idle_source) { |
| - nicesrc->idle_source = g_idle_source_new (); |
| - g_source_set_priority (nicesrc->idle_source, G_PRIORITY_HIGH); |
| - g_source_set_callback (nicesrc->idle_source, gst_nice_src_unlock_idler, src, NULL); |
| - g_source_attach (nicesrc->idle_source, g_main_loop_get_context (nicesrc->mainloop)); |
| - } |
| - GST_OBJECT_UNLOCK (src); |
| - |
| - return TRUE; |
| -} |
| - |
| -static gboolean |
| -gst_nice_src_unlock_stop (GstBaseSrc *src) |
| -{ |
| - GstNiceSrc *nicesrc = GST_NICE_SRC (src); |
| - |
| - GST_OBJECT_LOCK (src); |
| - nicesrc->unlocked = FALSE; |
| - if (nicesrc->idle_source) { |
| - g_source_destroy (nicesrc->idle_source); |
| - g_source_unref(nicesrc->idle_source); |
| - } |
| - nicesrc->idle_source = NULL; |
| + g_cond_signal (&nicesrc->outcond); |
| GST_OBJECT_UNLOCK (src); |
| |
| return TRUE; |
| @@ -278,19 +312,8 @@ gst_nice_src_create ( |
| GST_LOG_OBJECT (nicesrc, "create called"); |
| |
| GST_OBJECT_LOCK (basesrc); |
| - if (nicesrc->unlocked) { |
| - GST_OBJECT_UNLOCK (basesrc); |
| -#if GST_CHECK_VERSION (1,0,0) |
| - return GST_FLOW_FLUSHING; |
| -#else |
| - return GST_FLOW_WRONG_STATE; |
| -#endif |
| - } |
| - if (g_queue_is_empty (nicesrc->outbufs)) { |
| - GST_OBJECT_UNLOCK (basesrc); |
| - g_main_loop_run (nicesrc->mainloop); |
| - GST_OBJECT_LOCK (basesrc); |
| - } |
| + if (g_queue_is_empty (nicesrc->outbufs)) |
| + g_cond_wait (&nicesrc->outcond, GST_OBJECT_GET_LOCK (nicesrc)); |
| |
| *buffer = g_queue_pop_head (nicesrc->outbufs); |
| GST_OBJECT_UNLOCK (basesrc); |
| @@ -331,6 +354,7 @@ gst_nice_src_dispose (GObject *object) |
| g_queue_free (src->outbufs); |
| } |
| src->outbufs = NULL; |
| + g_cond_clear (&src->outcond); |
| |
| G_OBJECT_CLASS (gst_nice_src_parent_class)->dispose (object); |
| } |
| diff --git a/gst/gstnicesrc.h b/gst/gstnicesrc.h |
| index 5d3f554..2d9f674 100644 |
| --- a/gst/gstnicesrc.h |
| +++ b/gst/gstnicesrc.h |
| @@ -68,8 +68,8 @@ struct _GstNiceSrc |
| GMainContext *mainctx; |
| GMainLoop *mainloop; |
| GQueue *outbufs; |
| - gboolean unlocked; |
| - GSource *idle_source; |
| + GCond outcond; |
| + GThread *agent_io_thread; |
| }; |
| |
| typedef struct _GstNiceSrcClass GstNiceSrcClass; |
| -- |
| 2.3.4 |
| |