Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Application Options:

# Testing the WHEP client

The WHEP client only requires a few arguments, namely the WHEP endpoint to subscribe to (e.g., an endpoint created in the [Simple WHEP Server](https://github.com/meetecho/simple-whep-server)) and the audio and/or video caps of the codecs you expect to receive. If codecs match, incoming streams are automatically created from the negotiation process, and rendered accordingly.
The WHEP client only requires a few arguments, namely the WHEP endpoint to subscribe to (e.g., an endpoint created in the [Simple WHEP Server](https://github.com/meetecho/simple-whep-server)) and the audio and/or video caps of the codecs you expect to receive. It will initially attempt to send an SDP offer, but if the server changes stance to reply with a server-sent offer instead (via a 406 response), it will automatically switch mode accordingly and prepare an answer to send back. If at the end of the exchange (however it happened) codecs match, incoming streams are automatically created from the negotiation process, and rendered accordingly.

A simple example, that assumes the specified endpoint requires the "verysecret" token via Bearer authorization, is the following:

Expand Down
207 changes: 178 additions & 29 deletions src/whep-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ enum whep_state {
WHEP_STATE_CONNECTED,
WHEP_STATE_SUBSCRIBING,
WHEP_STATE_OFFER_PREPARED,
WHEP_STATE_ANSWER_PREPARED,
WHEP_STATE_STARTED,
WHEP_STATE_API_ERROR,
WHEP_STATE_ERROR
Expand All @@ -55,6 +56,7 @@ static gboolean no_trickle = FALSE, gathering_done = FALSE,
static const char *stun_server = NULL, **turn_server = NULL;
static char *auto_stun_server = NULL, **auto_turn_server = NULL;
static int latency = -1;
static gboolean server_sent_offer = FALSE;

/* API properties */
static enum whep_state state = 0;
Expand All @@ -68,9 +70,10 @@ static GAsyncQueue *candidates = NULL;
/* Helper methods and callbacks */
static gboolean whep_check_plugins(void);
static void whep_options(void);
static gboolean whep_initialize(void);
static gboolean whep_initialize(gboolean offer);
static void whep_negotiation_needed(GstElement *element, gpointer user_data);
static void whep_offer_available(GstPromise *promise, gpointer user_data);
static void whep_answer_available(GstPromise *promise, gpointer user_data);
static void whep_candidate(GstElement *webrtc G_GNUC_UNUSED,
guint mlineindex, char *candidate, gpointer user_data G_GNUC_UNUSED);
static gboolean whep_send_candidates(gpointer user_data);
Expand All @@ -83,8 +86,9 @@ static void whep_ice_connection_state(GstElement *webrtc, GParamSpec *pspec,
static void whep_dtls_connection_state(GstElement *dtls, GParamSpec *pspec,
gpointer user_data G_GNUC_UNUSED);
static void whep_connect(GstWebRTCSessionDescription *offer);
static void whep_answer(GstWebRTCSessionDescription *answer);
static void whep_process_link_header(char *link);
static gboolean whep_parse_offer(char *sdp_offer);
static gboolean whep_parse_sdp(char *sdp_object);
static void whep_disconnect(char *reason);
static void whep_incoming_stream(GstElement *webrtc, GstPad *pad, gpointer user_data);

Expand Down Expand Up @@ -231,7 +235,7 @@ int main(int argc, char *argv[]) {
if(follow_link)
whep_options();
/* Initialize the stack (and then connect to the WHEP endpoint) */
if(!whep_initialize())
if(!whep_initialize(TRUE))
exit(1);

/* Loop forever */
Expand Down Expand Up @@ -354,7 +358,7 @@ static gboolean source_events(GstPad *pad, GstObject *parent, GstEvent *event) {
}

/* Helper method to initialize the GStreamer WebRTC stack */
static gboolean whep_initialize(void) {
static gboolean whep_initialize(gboolean offer) {
/* Prepare the pipeline, using the info we got from the command line */
pipeline = gst_pipeline_new(NULL);
pc = gst_element_factory_make("webrtcbin", NULL);
Expand All @@ -367,14 +371,14 @@ static gboolean whep_initialize(void) {
g_object_set(pc, "ice-transport-policy", 1, NULL);
gst_bin_add(GST_BIN(pipeline), pc);
/* Add transceivers to receive audio and/or video */
if(audio_caps) {
if(offer && audio_caps) {
GstWebRTCRTPTransceiver *transceiver = NULL;
GstCaps *caps = gst_caps_from_string(audio_caps);
g_signal_emit_by_name(pc, "add-transceiver", GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY, caps, &transceiver);
gst_caps_unref(caps);
gst_object_unref(transceiver);
}
if(video_caps) {
if(offer && video_caps) {
GstWebRTCRTPTransceiver *transceiver = NULL;
GstCaps *caps = gst_caps_from_string(video_caps);
g_signal_emit_by_name(pc, "add-transceiver", GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY, caps, &transceiver);
Expand Down Expand Up @@ -409,7 +413,8 @@ static gboolean whep_initialize(void) {
}
}
/* Let's configure the function to be invoked when an SDP offer can be prepared */
g_signal_connect(pc, "on-negotiation-needed", G_CALLBACK(whep_negotiation_needed), NULL);
if(offer)
g_signal_connect(pc, "on-negotiation-needed", G_CALLBACK(whep_negotiation_needed), NULL);
/* We need a different callback to be notified about candidates to trickle to Janus */
g_signal_connect(pc, "on-ice-candidate", G_CALLBACK(whep_candidate), NULL);
/* We also add a couple of callbacks to be notified about connection state changes */
Expand Down Expand Up @@ -496,6 +501,39 @@ static void whep_offer_available(GstPromise *promise, gpointer user_data) {
}
}

/* Callback invoked when we have an SDP answer ready to be sent (server-sent offers) */
static GstWebRTCSessionDescription *answer = NULL;
static void whep_answer_available(GstPromise *promise, gpointer user_data) {
WHEP_PREFIX(LOG_INFO, "Answer created\n");
/* Make sure we're in the right state */
g_assert_cmphex(state, ==, WHEP_STATE_ANSWER_PREPARED);
g_assert_cmphex(gst_promise_wait(promise), ==, GST_PROMISE_RESULT_REPLIED);
const GstStructure *reply = gst_promise_get_reply(promise);
gst_structure_get(reply, "answer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &answer, NULL);
gst_promise_unref(promise);

/* Set the local description locally */
WHEP_PREFIX(LOG_INFO, "Setting local description\n");
promise = gst_promise_new();
g_signal_emit_by_name(pc, "set-local-description", answer, promise);
gst_promise_interrupt(promise);
gst_promise_unref(promise);

/* Now that a DTLS stack is available, try monitoring the DTLS state too */
GstElement *dtls = gst_bin_get_by_name(GST_BIN(pc), "dtlsdec0");
g_signal_connect(dtls, "notify::connection-state", G_CALLBACK(whep_dtls_connection_state), NULL);
gst_object_unref(dtls);

/* Now that the answer is ready, send it to the server via PATCH
* (unless we're not tricking, in which case we wait for gathering to be
* completed, and then add all candidates to this answer before sending it) */
if(!no_trickle || gathering_done) {
whep_answer(answer);
gst_webrtc_session_description_free(answer);
answer = NULL;
}
}

/* Callback invoked when a candidate to trickle becomes available */
static void whep_candidate(GstElement *webrtc G_GNUC_UNUSED,
guint mlineindex, char *candidate, gpointer user_data G_GNUC_UNUSED) {
Expand Down Expand Up @@ -606,9 +644,15 @@ static void whep_ice_gathering_state(GstElement *webrtc, GParamSpec *pspec,
gathering_done = TRUE;
/* If we're not trickling, send the SDP with all candidates now */
if(no_trickle) {
whep_connect(offer);
gst_webrtc_session_description_free(offer);
offer = NULL;
if(!server_sent_offer) {
whep_connect(offer);
gst_webrtc_session_description_free(offer);
offer = NULL;
} else {
whep_answer(answer);
gst_webrtc_session_description_free(answer);
answer = NULL;
}
}
break;
default:
Expand Down Expand Up @@ -716,7 +760,7 @@ static void whep_connect(GstWebRTCSessionDescription *offer) {
WHEP_LOG(LOG_VERB, "%s\n", sdp_offer);

/* Partially parse the SDP to find ICE credentials and the mid for the bundle m-line */
if(!whep_parse_offer(sdp_offer)) {
if(!whep_parse_sdp(sdp_offer)) {
whep_disconnect("SDP error");
return;
}
Expand All @@ -726,7 +770,9 @@ static void whep_connect(GstWebRTCSessionDescription *offer) {
GBytes *bytes = NULL;
guint status = whep_http_send(&session, "POST", (char *)server_url, sdp_offer, "application/sdp", &bytes);
g_free(sdp_offer);
if(status != 201) {
/* 201 means the server accepted our client-sent offer, 406 means the
* server may be asking to switch stance to s server-sent offer instead*/
if(status != 201 && status != 406) {
/* Didn't get the success we were expecting */
WHEP_LOG(LOG_ERR, " [%u] %s\n", status, status ? soup_message_get_reason_phrase(session.msg) : "HTTP error");
g_object_unref(session.msg);
Expand All @@ -736,6 +782,21 @@ static void whep_connect(GstWebRTCSessionDescription *offer) {
whep_disconnect("HTTP error");
return;
}
if(status == 406) {
WHEP_PREFIX(LOG_INFO, "Server switched to server-sent offers (406)\n");
/* FIXME Reset the pipeline */
pipeline = NULL;
pc = NULL;
WHEP_PREFIX(LOG_INFO, "Re-initializing the pipeline\n");
if(!whep_initialize(FALSE)) {
g_object_unref(session.msg);
g_object_unref(session.http_conn);
if(bytes != NULL)
g_bytes_unref(bytes);
whep_disconnect("Stance change error");
return;
}
}
/* Get the response */
const char *content_type = soup_message_headers_get_content_type(soup_message_get_response_headers(session.msg), NULL);
if(content_type == NULL || strcasecmp(content_type, "application/sdp")) {
Expand All @@ -749,20 +810,20 @@ static void whep_connect(GstWebRTCSessionDescription *offer) {
}
/* Get the body */
if(bytes == NULL || g_bytes_get_size(bytes) == 0) {
WHEP_LOG(LOG_ERR, "Missing SDP answer\n");
WHEP_LOG(LOG_ERR, "Missing SDP\n");
g_object_unref(session.msg);
g_object_unref(session.http_conn);
if(bytes != NULL)
g_bytes_unref(bytes);
whep_disconnect("SDP error");
return;
}
char *answer = g_malloc(g_bytes_get_size(bytes) + 1);
memcpy(answer, g_bytes_get_data(bytes, NULL), g_bytes_get_size(bytes));
answer[g_bytes_get_size(bytes)] = '\0';
char *sdp_body = g_malloc(g_bytes_get_size(bytes) + 1);
memcpy(sdp_body, g_bytes_get_data(bytes, NULL), g_bytes_get_size(bytes));
sdp_body[g_bytes_get_size(bytes)] = '\0';
g_bytes_unref(bytes);
if(strstr(answer, "v=0\r\n") != answer) {
WHEP_LOG(LOG_ERR, "Invalid SDP answer\n");
if(strstr(sdp_body, "v=0\r\n") != sdp_body) {
WHEP_LOG(LOG_ERR, "Invalid SDP\n");
g_object_unref(session.msg);
g_object_unref(session.http_conn);
whep_disconnect("SDP error");
Expand Down Expand Up @@ -834,14 +895,14 @@ static void whep_connect(GstWebRTCSessionDescription *offer) {
g_source_unref(patch_timer);
}

/* Process the SDP answer */
WHEP_PREFIX(LOG_INFO, "Received SDP answer (%zu bytes)\n", strlen(answer));
WHEP_LOG(LOG_VERB, "%s\n", answer);
/* Process the SDP offer or answer */
WHEP_PREFIX(LOG_INFO, "Received SDP (%zu bytes)\n", strlen(sdp_body));
WHEP_LOG(LOG_VERB, "%s\n", sdp_body);

/* Check if there are any candidates in the SDP: we'll need to fake trickles in case */
if(strstr(answer, "candidate") != NULL) {
if(strstr(sdp_body, "candidate") != NULL) {
int mlines = 0, i = 0;
gchar **lines = g_strsplit(answer, "\r\n", -1);
gchar **lines = g_strsplit(sdp_body, "\r\n", -1);
gchar *line = NULL;
while(lines[i] != NULL) {
line = lines[i];
Expand All @@ -868,22 +929,23 @@ static void whep_connect(GstWebRTCSessionDescription *offer) {
WHEP_LOG(LOG_ERR, "Error initializing SDP object (%d)\n", ret);
g_object_unref(session.msg);
g_object_unref(session.http_conn);
g_free(answer);
g_free(sdp_body);
whep_disconnect("SDP error");
return;
}
ret = gst_sdp_message_parse_buffer((guint8 *)answer, strlen(answer), sdp);
ret = gst_sdp_message_parse_buffer((guint8 *)sdp_body, strlen(sdp_body), sdp);
g_object_unref(session.msg);
g_object_unref(session.http_conn);
g_free(answer);
g_free(sdp_body);
if(ret != GST_SDP_OK) {
/* Something went wrong */
gst_sdp_message_free(sdp);
WHEP_LOG(LOG_ERR, "Error parsing SDP buffer (%d)\n", ret);
whep_disconnect("SDP error");
return;
}
GstWebRTCSessionDescription *gst_sdp = gst_webrtc_session_description_new(GST_WEBRTC_SDP_TYPE_ANSWER, sdp);
GstWebRTCSessionDescription *gst_sdp = gst_webrtc_session_description_new(
status == 201 ? GST_WEBRTC_SDP_TYPE_ANSWER : GST_WEBRTC_SDP_TYPE_OFFER, sdp);

/* Set remote description on our pipeline */
WHEP_PREFIX(LOG_INFO, "Setting remote description\n");
Expand All @@ -892,6 +954,93 @@ static void whep_connect(GstWebRTCSessionDescription *offer) {
gst_promise_interrupt(promise);
gst_promise_unref(promise);
gst_webrtc_session_description_free(gst_sdp);

if(status == 201) {
/* Client-side offer, we're done */
return;
}
/* If we got here, we're in server-sent offer mode, which means we
* must create an answer to send back via an HTTP PATCH request */
/* Finally, create an answer to send back */
WHEP_PREFIX(LOG_INFO, "Creating answer\n");
state = WHEP_STATE_ANSWER_PREPARED;
promise = gst_promise_new_with_change_func(whep_answer_available, NULL, NULL);
g_signal_emit_by_name(pc, "create-answer", NULL, promise);
}

/* Helper method to send our answer to the WHEP resource */
static void whep_answer(GstWebRTCSessionDescription *answer) {
/* Convert the SDP object to a string */
char *sdp_answer = gst_sdp_message_as_text(answer->sdp);
WHEP_PREFIX(LOG_INFO, "Sending SDP answer (%zu bytes)\n", strlen(sdp_answer));

/* If we're not trickling, add our candidates to the SDP */
if(no_trickle) {
/* Prepare the candidate attributes */
char attributes[4096], expanded_sdp[8192];
attributes[0] = '\0';
expanded_sdp[0] = '\0';
char *candidate = NULL;
while((candidate = g_async_queue_try_pop(candidates)) != NULL) {
WHEP_PREFIX(LOG_VERB, "Adding candidate to SDP: %s\n", candidate);
g_strlcat(attributes, "a=", sizeof(attributes));
g_strlcat(attributes, candidate, sizeof(attributes));
g_strlcat(attributes, "\r\n", sizeof(attributes));
g_free(candidate);
}
/* Add them to all m-lines */
int mlines = 0, i = 0;
gchar **lines = g_strsplit(sdp_answer, "\r\n", -1);
gchar *line = NULL;
while(lines[i] != NULL) {
line = lines[i];
if(strstr(line, "m=") == line) {
/* New m-line */
mlines++;
if(mlines > 1)
g_strlcat(expanded_sdp, attributes, sizeof(expanded_sdp));
}
if(strlen(line) > 2) {
g_strlcat(expanded_sdp, line, sizeof(expanded_sdp));
g_strlcat(expanded_sdp, "\r\n", sizeof(expanded_sdp));
}
i++;
}
g_clear_pointer(&lines, g_strfreev);
g_strlcat(expanded_sdp, attributes, sizeof(expanded_sdp));
g_free(sdp_answer);
sdp_answer = g_strdup(expanded_sdp);
}
WHEP_LOG(LOG_VERB, "%s\n", sdp_answer);

/* Partially parse the SDP to find ICE credentials and the mid for the bundle m-line */
if(!whep_parse_sdp(sdp_answer)) {
whep_disconnect("SDP error");
return;
}

/* Create an HTTP connection */
whep_http_session session = { 0 };
guint status = whep_http_send(&session, "PATCH", (char *)resource_url, sdp_answer, "application/sdp", NULL);
g_free(sdp_answer);
if(status != 200 && status != 204) {
/* Didn't get the success we were expecting */
WHEP_LOG(LOG_ERR, " [%u] %s\n", status, status ? soup_message_get_reason_phrase(session.msg) : "HTTP error");
g_object_unref(session.msg);
g_object_unref(session.http_conn);
whep_disconnect("HTTP error");
return;
}
/* Check if there's an ETag we should send in upcoming requests */
const char *etag = soup_message_headers_get_one(soup_message_get_response_headers(session.msg), "etag");
if(etag == NULL) {
WHEP_LOG(LOG_WARN, "No ETag header, won't be able to set If-Match when trickling\n");
} else {
latest_etag = g_strdup(etag);
}

/* Negotiation done */
WHEP_PREFIX(LOG_INFO, "Negotiation completed\n");
}

/* Helper method to disconnect from the WHEP endpoint */
Expand Down Expand Up @@ -1012,8 +1161,8 @@ static guint whep_http_send(whep_http_session *session, char *method,
}

/* Helper method to parse SDP offers and extract stuff we need */
static gboolean whep_parse_offer(char *sdp_offer) {
gchar **parts = g_strsplit(sdp_offer, "\n", -1);
static gboolean whep_parse_sdp(char *sdp_object) {
gchar **parts = g_strsplit(sdp_object, "\n", -1);
gboolean mline = FALSE, success = TRUE, done = FALSE;
if(parts) {
int index = 0;
Expand Down