Skip to content

Commit

Permalink
Fixes #1496: Initial approach to implementing protocol observers (#1497)
Browse files Browse the repository at this point in the history
Creates subdirectory for observer source code. Implements stub
HTTP/1.1 and HTTP/2.0 observers. Adds proof-of-concept protocol
classification code to TCP observer.

Closes #1496
  • Loading branch information
kgiusti committed May 8, 2024
1 parent f93c6da commit 4f1053b
Show file tree
Hide file tree
Showing 9 changed files with 486 additions and 65 deletions.
22 changes: 11 additions & 11 deletions include/qpid/dispatch/protocol_observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

#include <qpid/dispatch/buffer.h>
#include <qpid/dispatch/protocols.h>
#include <qpid/dispatch/vanflow.h>

/**
Expand Down Expand Up @@ -68,7 +69,7 @@ void qdpo_config_add_address(qdpo_config_t *config, const char *field, const cha


typedef struct qdpo_t qdpo_t;
typedef void* qdpo_transport_handle_t;
typedef struct qdpo_transport_handle_t qdpo_transport_handle_t;

/**
* Create a new protocol observer. Protocol observers take raw octets and attempt to detect the application protocol
Expand All @@ -78,7 +79,7 @@ typedef void* qdpo_transport_handle_t;
* @param config Configuration returned by qdpo_config
* @return qdpo_t* Newly allocated observer
*/
qdpo_t *protocol_observer(const char *base, qdpo_config_t *config);
qdpo_t *protocol_observer(qd_protocol_t base, qdpo_config_t *config);

/**
* Free an allocated observer
Expand All @@ -88,33 +89,32 @@ qdpo_t *protocol_observer(const char *base, qdpo_config_t *config);
void qdpo_free(qdpo_t *observer);

/**
* Provide the first buffer for a new connection. This is payload sent from the client of the connection.
* Create an observer for a new connection.
*
* @param observer The observer returned by protocol_observer
* @param vflow The vanflow record for the client-side transport flow
* @param transport_context A context unique to this connections transport (used in callbacks)
* @param buf The buffer containing the protocol payload
* @param offset The offset into the buffer where the first protocol octet is found
* @return void* A transport handle that references the observer's state for this connection
* @param conn_id The routers connection identifier associated with this flow.
* @return A transport handle that references the observer's state for this connection
*/
qdpo_transport_handle_t qdpo_first(qdpo_t *observer, vflow_record_t *vflow, void *transport_context, qd_buffer_t *buf, size_t offset);
qdpo_transport_handle_t *qdpo_begin(qdpo_t *observer, vflow_record_t *vflow, void *transport_context, uint64_t conn_id);

/**
* Provide subsequent payload data to an already established connection.
*
* @param transport_handle The handle returned by qdpo_first
* @param from_client True if this payload is from the client, false if from the server
* @param buf The buffer containing the protocol payload
* @param offset The offset into the buffer where the next protocol octet is found
* @param data The raw protocol data octets
* @param length The length of the data in octets
*/
void qdpo_data(qdpo_transport_handle_t transport_handle, bool from_client, qd_buffer_t *buf, size_t offset);
void qdpo_data(qdpo_transport_handle_t *transport_handle, bool from_client, const unsigned char *data, size_t length);

/**
* Indicate the end of a connection.
*
* @param connection_handle The handle returned by qdpo_first. This handle
* should not be used after making this call.
*/
void qdpo_end(qdpo_transport_handle_t transport_handle);
void qdpo_end(qdpo_transport_handle_t *transport_handle);

#endif
5 changes: 4 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ set(qpid_dispatch_SOURCES
adaptors/amqp/qd_connection.c
adaptors/amqp/qd_connector.c
adaptors/amqp/server_config.c
observers/protocol_observer.c
observers/tcp_observer.c
observers/http1_observer.c
observers/http2_observer.c
alloc.c
alloc_pool.c
aprintf.c
Expand Down Expand Up @@ -87,7 +91,6 @@ set(qpid_dispatch_SOURCES
policy.c
policy_spec.c
protocol_adaptor.c
protocol_observer.c
proton_utils.c
posix/threading.c
python_embedded.c
Expand Down
76 changes: 46 additions & 30 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ static void TL_setup_listener(qd_tcp_listener_t *li)
// TODO - add configuration to the listener to influence whether and how the observer is set up.
//
li->protocol_observer_config = qdpo_config(0, true);
li->protocol_observer = protocol_observer("tcp", li->protocol_observer_config);
li->protocol_observer = protocol_observer(QD_PROTOCOL_TCP, li->protocol_observer_config);

//
// Create an adaptor listener. This listener will automatically create a listening socket when there is at least one
Expand Down Expand Up @@ -489,6 +489,11 @@ static void free_connection_IO(void *context)
sys_mutex_unlock(&conn->activation_lock);
// Do NOT free the core_activation lock since the core may be holding it

if (conn->observer_handle) {
qdpo_end(conn->observer_handle);
conn->observer_handle = 0;
}

if (conn->common.parent) {
if (conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent;
Expand All @@ -513,7 +518,7 @@ static void free_connection_IO(void *context)
}
}

// Pass connector to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// Pass connection to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// qdr_core_free_tcp_resource_CT()
free_tcp_resource(&conn->common);
}
Expand Down Expand Up @@ -758,7 +763,7 @@ static uint64_t produce_read_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mess
if (qd_buffer_size(buf) > 0) {
DEQ_INSERT_TAIL(qd_buffers, buf);
if (conn->listener_side && !!conn->observer_handle) {
qdpo_data(conn->observer_handle, true, buf, 0);
qdpo_data(conn->observer_handle, true, qd_buffer_base(buf), qd_buffer_size(buf));
}
} else {
qd_buffer_free(buf);
Expand Down Expand Up @@ -797,7 +802,7 @@ static uint64_t consume_write_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes
qd_buffer_t *buf = DEQ_HEAD(buffers);
for (size_t i = 0; i < actual; i++) {
if (conn->listener_side && !!conn->observer_handle) {
qdpo_data(conn->observer_handle, false, buf, 0);
qdpo_data(conn->observer_handle, false, qd_buffer_base(buf), qd_buffer_size(buf));
}
raw_buffers[i].context = (uintptr_t) buf;
raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
Expand All @@ -819,11 +824,9 @@ static uint64_t consume_write_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes
// output message buffers and will free them as they are processed. Due to that we need to make a copy of these buffers
// in order to avoid freeing buffers that are part of the message (double-free).
//
static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream, qd_buffer_list_t *buffers, size_t limit)
static void copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream, qd_buffer_list_t *buffers, size_t limit)
{
size_t offset = 0;
uint64_t octets = 0;
const bool observe = (conn->listener_side && !!conn->observer_handle);

assert(conn->tls);

Expand All @@ -838,12 +841,8 @@ static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes
qd_buffer_t *clone = qd_buffer();
clone->size = size;
memcpy(qd_buffer_base(clone), qd_buffer_base(conn->outbound_body) + offset, size);
if (observe) {
qdpo_data(conn->observer_handle, false, clone, 0);
}
DEQ_INSERT_TAIL(*buffers, clone);
}
octets += size;
offset = 0;
conn->outbound_body = DEQ_NEXT(conn->outbound_body);
}
Expand All @@ -852,8 +851,6 @@ static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_mes
conn->outbound_body_complete = true;
qd_message_release_raw_body(stream);
}

return octets;
}

static uint64_t consume_message_body_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream)
Expand All @@ -879,16 +876,20 @@ static uint64_t consume_message_body_XSIDE_IO(qd_tcp_connection_t *conn, qd_mess
// Note: There may be a non-zero offset only on the first body buffer. It is assumed that
// every subsequent buffer will have an offset of 0.
//
const bool observe = conn->listener_side && !!conn->observer_handle;
while (!!conn->outbound_body && pn_raw_connection_write_buffers_capacity(conn->raw_conn) > 0) {
if (conn->listener_side && !!conn->observer_handle) {
qdpo_data(conn->observer_handle, false, conn->outbound_body, offset);
unsigned char *bytes = qd_buffer_base(conn->outbound_body) + offset;
size_t size = qd_buffer_size(conn->outbound_body) - offset;

if (observe) {
qdpo_data(conn->observer_handle, false, bytes, size);
}
pn_raw_buffer_t raw_buffer;
raw_buffer.context = 0;
raw_buffer.bytes = (char*) qd_buffer_base(conn->outbound_body);
raw_buffer.capacity = qd_buffer_capacity(conn->outbound_body);
raw_buffer.size = qd_buffer_size(conn->outbound_body) - offset;
raw_buffer.offset = offset;
raw_buffer.bytes = (char*) bytes;
raw_buffer.capacity = 0;
raw_buffer.size = size;
raw_buffer.offset = 0;
octets += raw_buffer.size;
pn_raw_connection_write_buffers(conn->raw_conn, &raw_buffer, 1);
conn->outbound_body = DEQ_NEXT(conn->outbound_body);
Expand Down Expand Up @@ -1429,9 +1430,8 @@ static bool manage_flow_XSIDE_IO(qd_tcp_connection_t *conn)
//
static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers, size_t limit)
{
qd_tcp_connection_t *conn = (qd_tcp_connection_t *) context;
const bool observe = conn->listener_side && !!conn->observer_handle;
uint64_t octets = 0;
qd_tcp_connection_t *conn = (qd_tcp_connection_t *) context;
uint64_t octets = 0;

assert(limit > 0);
assert(DEQ_IS_EMPTY(*buffers));
Expand All @@ -1440,7 +1440,7 @@ static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers
return octets;

if (!conn->outbound_body_complete) {
octets = copy_message_body_TLS_XSIDE_IO(conn, conn->outbound_stream, buffers, limit);
copy_message_body_TLS_XSIDE_IO(conn, conn->outbound_stream, buffers, limit);
assert(limit >= DEQ_SIZE(*buffers));
limit -= DEQ_SIZE(*buffers);
}
Expand All @@ -1449,19 +1449,21 @@ static int64_t tls_consume_data_buffers(void *context, qd_buffer_list_t *buffers
qd_buffer_list_t tmp = DEQ_EMPTY;
qd_message_consume_buffers(conn->outbound_stream, &tmp, limit);
assert(limit >= DEQ_SIZE(tmp));
limit -= DEQ_SIZE(tmp);
qd_buffer_t *buf = DEQ_HEAD(tmp);
DEQ_APPEND(*buffers, tmp);
}

if (!DEQ_IS_EMPTY(*buffers)) {
const bool observe = conn->listener_side && !!conn->observer_handle;

qd_buffer_t *buf = DEQ_HEAD(*buffers);
while (buf) {
octets += qd_buffer_size(buf);
if (observe) {
qdpo_data(conn->observer_handle, false, buf, 0);
qdpo_data(conn->observer_handle, false, qd_buffer_base(buf), qd_buffer_size(buf));
}
buf = DEQ_NEXT(buf);
}
DEQ_APPEND(*buffers, tmp);
}

if (octets) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
"[C%"PRIu64"] TLS consumed %"PRIu64" cleartext octets from stream", conn->conn_id, octets);

Expand Down Expand Up @@ -1536,6 +1538,7 @@ static bool manage_tls_flow_XSIDE_IO(qd_tcp_connection_t *conn)
const int tls_status = qd_tls_do_io2(conn->tls, conn->raw_conn, tls_consume_data_buffers, conn,
(can_produce) ? &decrypted_buffers : 0,
&decrypted_octets);

//
// Process inbound cleartext data.
//
Expand All @@ -1544,6 +1547,15 @@ static bool manage_tls_flow_XSIDE_IO(qd_tcp_connection_t *conn)
more_work = true;
conn->inbound_octets += decrypted_octets;
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS, conn->inbound_octets);

if (conn->listener_side && !!conn->observer_handle) {
qd_buffer_t *buf = DEQ_HEAD(decrypted_buffers);
while (buf) {
qdpo_data(conn->observer_handle, true, qd_buffer_base(buf), qd_buffer_size(buf));
buf = DEQ_NEXT(buf);
}
}

qd_message_produce_buffers(conn->inbound_stream, &decrypted_buffers);

qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE TLS read: Produced %"PRIu64" octets into stream", conn->conn_id, conn->listener_side ? 'L' : 'C', decrypted_octets);
Expand Down Expand Up @@ -2013,7 +2025,6 @@ static void on_accept(qd_adaptor_listener_t *adaptor_listener, pn_listener_t *pn
//
qd_tcp_listener_incref(listener);


sys_mutex_init(&conn->activation_lock);
sys_atomic_init(&conn->core_activation, 0);
sys_atomic_init(&conn->raw_opened, 0);
Expand All @@ -2036,8 +2047,13 @@ static void on_accept(qd_adaptor_listener_t *adaptor_listener, pn_listener_t *pn
vflow_set_uint64(listener->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, listener->connections_opened);
sys_mutex_unlock(&listener->lock);

if (listener->protocol_observer) {
conn->observer_handle = qdpo_begin(listener->protocol_observer, conn->common.vflow, conn, conn->conn_id);
}

conn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(conn->raw_conn, &conn->context);
// Note: this will trigger the connection's event handler on another thread:
pn_listener_raw_accept(pn_listener, conn->raw_conn);
}

Expand Down
2 changes: 1 addition & 1 deletion src/adaptors/tcp/tcp_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ struct qd_tcp_listener_t {
qd_adaptor_config_t *adaptor_config;
qd_tls_domain_t *tls_domain;
qd_adaptor_listener_t *adaptor_listener;
qd_tcp_connection_list_t connections;
qd_tcp_connection_list_t connections;
qdpo_config_t *protocol_observer_config;
qdpo_t *protocol_observer;
uint64_t connections_opened;
Expand Down
48 changes: 48 additions & 0 deletions src/observers/http1_observer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "private.h"

#include <inttypes.h>


static void http1_observe(qdpo_transport_handle_t *th, bool from_client, const unsigned char *data, size_t length)
{
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG,
"[C%" PRIu64 "] HTTP/1.1 observer classifying protocol: %zu %s octets", th->conn_id, length, from_client ? "client" : "server");
}



void qdpo_http1_init(qdpo_transport_handle_t *th)
{
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer initialized", th->conn_id);

th->protocol = QD_PROTOCOL_HTTP1;
th->observe = http1_observe;
th->http1.tbd = 42; // whatever;

}

void qdpo_http1_final(qdpo_transport_handle_t *th)
{
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/1.1 observer finalized", th->conn_id);
th->observe = 0;
}

48 changes: 48 additions & 0 deletions src/observers/http2_observer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "private.h"

#include <inttypes.h>


static void http2_observe(qdpo_transport_handle_t *th, bool from_client, const unsigned char *data, size_t length)
{
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG,
"[C%" PRIu64 "] HTTP/2.0 observer classifying protocol: %zu %s octets", th->conn_id, length, from_client ? "client" : "server");

}


void qdpo_http2_init(qdpo_transport_handle_t *th)
{
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/2.0 observer initialized", th->conn_id);

th->protocol = QD_PROTOCOL_HTTP2;
th->observe = http2_observe;
th->http2.tbd = 42; // whatever
}


void qdpo_http2_final(qdpo_transport_handle_t *th)
{
qd_log(LOG_HTTP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] HTTP/2.0 observer finalized", th->conn_id);
th->observe = 0;
}

0 comments on commit 4f1053b

Please sign in to comment.