1. 背景と目的
現状
porter ライブラリの TCP サポートは以下の 2 種別のみ。
| 種別 | 方向 | 接続数 |
| POTR_TYPE_TCP | 単方向 (SENDER → RECEIVER) | 1:1 |
| POTR_TYPE_TCP_BIDIR | 双方向 (SENDER ↔ RECEIVER) | 1:1 |
RECEIVER は 1 台の SENDER との接続しか同時に維持できない。これは receiver_accept_loop() が accept() → join_recv_thread() (切断まで待機) → 次の accept() という逐次処理になっているためである。
解決したい課題
複数の SENDER が 1 台の RECEIVER へ同時接続するシナリオ(例: 複数センサー/エージェントがデータ収集サーバーへ送信)に対応できない。
目的
N:1 TCP 通信を可能にする新通信種別を追加する。
2. 追加する通信種別
POTR_TYPE_TCP_N1 = 10,
POTR_TYPE_TCP_BIDIR_N1 = 11,
SENDER 側の動作
SENDER 側の変更は不要。既存の POTR_TYPE_TCP または POTR_TYPE_TCP_BIDIR をそのまま使用して接続する。
RECEIVER 側の動作変更
| 機能 | 既存 1:1 | N:1 |
| 同時接続数 | 1 | max_peers まで |
| ピア識別 | POTR_PEER_NA | peer_id で各 SENDER を識別 |
| RECEIVER からの送信 (BIDIR) | 単一接続へ | peer_id 指定で特定 SENDER へ |
| マルチパス | 最大 POTR_MAX_PATH パス/接続 | 各 SENDER が最大 POTR_MAX_PATH パスで接続可 |
3. アーキテクチャ設計
3.1 accept ループのモデル変更
既存の 1:1 accept ループ (逐次処理)
[accept スレッド (path 0)]
accept() ────────── conn_A が来るまで待機
│
└─ ctx->tcp_conn_fd[0] = conn_A
start_connected_threads() ← recv/health スレッド起動
join_recv_thread() ← conn_A の切断まで待機 (ブロック)
stop_connected_threads()
← この間は conn_B を accept できない
│
└─ ループ先頭へ: 次の accept()
N:1 accept ループ (並行処理)
[accept スレッド (path 0)]
accept() ──── conn_A が来る
│
└─ peer_create_tcp(conn_A) → peer_A
tcp_peer_recv_thread_start(peer_A) ← 非同期
tcp_peer_health_thread_start(peer_A) ← 非同期 (BIDIR_N1 のみ)
← join しない。即座に次の accept へ
│
accept() ──── conn_B が来る (conn_A の切断を待たない)
│
└─ peer_create_tcp(conn_B) → peer_B
...
[peer_A の recv スレッド] [peer_B の recv スレッド]
データ受信 → callback(peer_id=A) データ受信 → callback(peer_id=B)
切断検知 → peer_free_tcp(peer_A) 切断検知 → peer_free_tcp(peer_B)
3.2 スレッド構成
既存 1:1 TCP (1 サービス当たり)
accept スレッド × n_path
recv スレッド × n_path
health スレッド × n_path (BIDIR or SENDER のみ)
send スレッド × 1 (BIDIR or SENDER のみ)
N:1 TCP (1 サービス当たり)
accept スレッド × n_path ← ピアをまたいで共有
recv スレッド × n_path × peer数 ← per-peer, per-path
health スレッド × n_path × peer数 ← per-peer, per-path (BIDIR_N1 のみ)
send スレッド × 1 ← 全ピア共有 (BIDIR_N1 のみ)
最大スレッド数: n_path(4) × max_peers × 2 + n_path(4) + 1 (BIDIR_N1 の場合)
注意: max_peers が大きいとスレッド数が増加する。TCP N:1 向けのデフォルト max_peers は 32 程度を推奨する。大量接続が必要な場合は将来的に epoll/IOCP ベースへの移行を検討すること。
4. データ構造の変更
ファイル: prod/porter/include/porter_type.h
typedef enum {
@ POTR_TYPE_BROADCAST_RAW
1:N 通信 RAW モード (UDP ブロードキャスト)。
@ POTR_TYPE_BROADCAST
1:N 通信 (UDP ブロードキャスト)。
@ POTR_TYPE_UNICAST_BIDIR_N1
N:1 双方向通信 (UDP ユニキャスト)。
@ POTR_TYPE_UNICAST
1:1 通信 (UDP ユニキャスト)。
@ POTR_TYPE_MULTICAST_RAW
1:N 通信 RAW モード (UDP マルチキャスト)。
@ POTR_TYPE_TCP_BIDIR
TCP 双方向通信 (両端が potrSend 可)。
@ POTR_TYPE_TCP
TCP ユニキャスト通信 (単方向: SENDER のみ potrSend 可)。
@ POTR_TYPE_UNICAST_RAW
1:1 通信 RAW モード (UDP ユニキャスト)。
@ POTR_TYPE_UNICAST_BIDIR
双方向 1:1 通信 (UDP ユニキャスト)。
@ POTR_TYPE_MULTICAST
1:N 通信 (UDP マルチキャスト)。
ファイル: prod/porter/libsrc/porter/potrContext.h
既存の UDP 専用フィールド (dest_addr[], n_paths, path_last_recv_sec[] 等) はそのまま保持し、末尾に TCP N:1 専用フィールドを追加する。
{
volatile int tcp_peer_running;
#define POTR_MAX_PATH
マルチパスの最大パス数。
uint32_t PotrPeerId
ピア識別子。
struct PotrPeerContext_ PotrPeerContext
N:1 モードにおける個別ピアのコンテキスト。
#define POTR_NACK_DEDUP_SLOTS
NACK 重複抑制リングバッファのスロット数 (POTR_MAX_PATH × 2)。
pthread_cond_t PotrCondVar
pthread_mutex_t PotrMutex
セッションコンテキスト構造体。PotrHandle の実体。
int reorder_pending
リオーダー待機中 (1: 待機中, 0: 待機なし)。
uint8_t nack_dedup_next
次に書き込むスロット。
PotrWindow send_window
送信ウィンドウ (NACK 再送用)。
int32_t last_recv_tv_nsec
最終受信時刻 ナノ秒部。
int32_t reorder_deadline_nsec
タイムアウト期限 ナノ秒部。
size_t frag_buf_len
現在のデータ長。
int frag_compressed
圧縮フラグ (非 0: 圧縮あり)。
int32_t session_tv_nsec
自セッション開始時刻 ナノ秒部。
int64_t reorder_deadline_sec
タイムアウト期限 秒部 (CLOCK_MONOTONIC)。
int64_t session_tv_sec
自セッション開始時刻 秒部。
int active
1: 有効スロット, 0: 空き。
uint8_t * frag_buf
フラグメント結合バッファ (動的確保)。
int64_t last_recv_tv_sec
最終受信時刻 秒部 (CLOCK_MONOTONIC)。0 = 未受信。
PotrWindow recv_window
受信ウィンドウ (順序整列)。
int n_paths
アクティブパス数。ループ境界には使わず管理カウンタとして使用する。
PotrNackDedupEntry nack_dedup_buf[POTR_NACK_DEDUP_SLOTS]
NACK 重複抑制バッファ。
uint32_t reorder_nack_num
待機中の欠番通番。
volatile int health_alive
疎通状態 (1: alive, 0: dead/未接続)。
struct sockaddr_in dest_addr[POTR_MAX_PATH]
送信先ソケットアドレス (インデックス = ctx->sock[] の添字)。未使用スロットは sin_family == 0。
uint32_t peer_session_id
追跡中のピアセッション識別子。
uint32_t session_id
自セッション識別子 (乱数)。
PotrPeerId peer_id
外部公開用ピア識別子 (単調増加カウンタから付与)。
int64_t path_last_recv_sec[POTR_MAX_PATH]
パスごとの最終受信時刻 秒部。未使用スロットは 0。
int peer_session_known
ピアセッションが初期化済みか (0: 未初期化)。
PotrMutex send_window_mutex
send_window 保護 (送信・受信・ヘルスチェックスレッド競合)。
int32_t path_last_recv_nsec[POTR_MAX_PATH]
パスごとの最終受信時刻 ナノ秒部。
4.3 inline 判定関数の追加・変更
ファイル: prod/porter/libsrc/porter/potrContext.h
static inline int potr_is_tcp_n1_type(
PotrType t)
{
return t == POTR_TYPE_TCP_N1 || t == POTR_TYPE_TCP_BIDIR_N1;
}
{
|| t == POTR_TYPE_TCP_N1
|| t == POTR_TYPE_TCP_BIDIR_N1;
}
static int potr_is_tcp_type(PotrType t)
TCP 通信種別 (POTR_TYPE_TCP / POTR_TYPE_TCP_BIDIR) か判定する。
5. 新規・変更ファイル詳細
ファイル: prod/porter/libsrc/porter/potrPeerTable.h / .c
関数シグネチャ
const struct sockaddr_in *peer_addr,
int path_idx);
処理内容
1. 空きスロット確認 (n_peers >= max_peers なら NULL を返す)
2. 空きスロットに対して以下を初期化:
a. セッション生成 (peer_generate_session())
b. window_init(&peer->send_window, window_size, max_payload)
c. window_init(&peer->recv_window, window_size, max_payload)
d. POTR_MUTEX_INIT(peer->send_window_mutex)
e. POTR_MUTEX_INIT(peer->tcp_recv_window_mutex)
f. frag_buf = malloc(max_message_size) ; frag_buf_len = 0
g. 全 path の tcp_conn_fd を POTR_INVALID_SOCKET に設定
h. tcp_conn_fd[path_idx] = conn (今回 accept した path のみ設定)
i. tcp_last_ping_recv_ms[path_idx] = current_ms()
j. tcp_last_ping_req_recv_ms[path_idx] = current_ms()
k. 全 path の tcp_send_mutex, tcp_health_mutex, tcp_health_wakeup を初期化
l. tcp_peer_running = 1
m. ctx_back = ctx
n. peer_id = allocate_peer_id(ctx)
o. active = 1 ; ctx->n_peers++
3. ポインタを返す
TCP N:1 型の場合、既存の UDP N:1 用 cleanup に加えて以下を実行する。
前提: 呼び出し前に全 path の recv/health スレッドが停止済みであること。
・全 path の tcp_conn_fd が POTR_INVALID_SOCKET であることを確認 (assert)
・全 path の tcp_send_mutex を POTR_MUTEX_DESTROY
・全 path の tcp_health_mutex を POTR_MUTEX_DESTROY
・全 path の tcp_health_wakeup を POTR_CONDVAR_DESTROY
・tcp_recv_window_mutex を POTR_MUTEX_DESTROY
5.2 potrTcpPeerThread.c - 新規ファイル
ファイル: prod/porter/libsrc/porter/thread/potrTcpPeerThread.h / .c
per-peer の recv スレッドと health スレッドを実装する。既存の potrRecvThread.c / potrHealthThread.c のパケット処理ロジックを内部関数として再利用する。
API
int path_idx);
int path_idx);
recv スレッド関数の処理フロー
static void *tcp_peer_recv_thread_func(void *arg)
{
int path_idx = arg->path_idx;
while (peer->tcp_recv_running[path_idx] && peer->tcp_peer_running)
{
n = tcp_read_packet(peer->tcp_conn_fd[path_idx], buf, &pkt);
if (n <= 0) break;
tcp_process_packet_peer(ctx, peer, &pkt);
}
}
peer->tcp_health_running[path_idx] = 0;
POTR_CONDVAR_SIGNAL(peer->tcp_health_wakeup[path_idx]);
POTR_THREAD_JOIN(peer->tcp_health_thread[path_idx]);
{
shutdown(peer->tcp_conn_fd[path_idx], ...);
POTR_SOCKET_CLOSE(peer->tcp_conn_fd[path_idx]);
int remaining = 0;
for (k = 0; k < ctx->
n_path; k++)
if (remaining == 0) {
}
peer_free_tcp(ctx, peer);
}
}
return NULL;
}
@ POTR_EVENT_DISCONNECTED
切断を検知 (タイムアウト / FIN 受信 / REJECT 受信)。data=NULL, len=0。
@ POTR_EVENT_CONNECTED
送信者からの疎通を初検知 or 復帰。data=NULL, len=0。
#define POTR_INVALID_SOCKET
#define POTR_MUTEX_UNLOCK(m)
#define POTR_MUTEX_LOCK(m)
PotrRecvCallback callback
受信コールバック。
PotrMutex peers_mutex
ピアテーブル保護用ミューテックス。
health スレッド関数の処理フロー
既存の potr_tcp_health_thread_func() を per-peer 版に移植する。 ctx->tcp_conn_fd[path_idx] / ctx->tcp_last_ping_recv_ms を peer->tcp_conn_fd[path_idx] / peer->tcp_last_ping_recv_ms に置き換える。
static void *tcp_peer_health_thread_func(void *arg)
{
while (peer->tcp_health_running[path_idx] && peer->tcp_peer_running)
{
POTR_CONDVAR_TIMEDWAIT(peer->tcp_health_wakeup[path_idx],
peer->tcp_health_mutex[path_idx],
health_interval_ms);
if (!peer->tcp_health_running[path_idx]) break;
tcp_send_ping(peer->tcp_conn_fd[path_idx], &peer->tcp_send_mutex[path_idx], ctx);
if (now - peer->tcp_last_ping_recv_ms[path_idx] > health_timeout_ms) {
POTR_LOG(INFO,
"peer[%u] path[%d]: PING timeout, closing connection",
POTR_SOCKET_CLOSE(peer->tcp_conn_fd[path_idx]);
break;
}
}
return NULL;
}
static uint64_t health_get_ms(void)
#define POTR_LOG(level,...)
porter 内部ログ出力マクロ。
ファイル: prod/porter/libsrc/porter/thread/potrConnectThread.c
receiver_accept_n1_loop() 新規追加
static void receiver_accept_n1_loop(
struct PotrContext_ *ctx,
int path_idx)
{
int is_bidir = (ctx->
service.
type == POTR_TYPE_TCP_BIDIR_N1);
{
struct sockaddr_in peer_addr;
socklen_t peer_len = (socklen_t)sizeof(peer_addr);
char peer_addr_str[INET_ADDRSTRLEN];
(struct sockaddr *)&peer_addr, &peer_len);
{
continue;
}
inet_ntop(AF_INET, &peer_addr.sin_addr, peer_addr_str, sizeof(peer_addr_str));
if (tcp_n1_is_src_filtered(ctx, path_idx, &peer_addr))
{
"rejected (src filter) from %s:%u",
peer_addr_str, (unsigned)ntohs(peer_addr.sin_port));
POTR_SOCKET_CLOSE(conn);
continue;
}
"TCP N:1 accepted from %s:%u",
peer_addr_str, (unsigned)ntohs(peer_addr.sin_port));
peer = peer_create_tcp(ctx, conn, &peer_addr, path_idx);
if (peer == NULL)
{
"max_peers reached, rejected from %s:%u",
peer_addr_str, (unsigned)ntohs(peer_addr.sin_port));
POTR_SOCKET_CLOSE(conn);
continue;
}
"peer[%u] created",
if (tcp_peer_recv_thread_start(ctx, peer, path_idx) !=
POTR_SUCCESS)
{
peer_free_tcp(ctx, peer);
POTR_SOCKET_CLOSE(conn);
continue;
}
if (is_bidir)
{
if (tcp_peer_health_thread_start(ctx, peer, path_idx) !=
POTR_SUCCESS)
{
"connect_thread: tcp_peer_health_thread_start failed");
POTR_SOCKET_CLOSE(conn);
continue;
}
}
}
}
#define POTR_SUCCESS
成功の戻り値を表す定数。
@ POTR_TRACE_ERROR
エラー。操作の失敗を記録。TRACE_LV_ERROR (1) と同値。
@ POTR_TRACE_INFO
情報。TRACE_LV_INFO (3) と同値。
@ POTR_TRACE_VERBOSE
詳細情報 (デバッグ)。TRACE_LV_VERBOSE (4) と同値。
@ POTR_TRACE_WARNING
警告。回復可能な異常を記録。TRACE_LV_WARNING (2) と同値。
PotrSocket tcp_listen_sock[POTR_MAX_PATH]
RECEIVER: listen ソケット (path ごと)。
volatile int connect_thread_running[POTR_MAX_PATH]
connect スレッド実行フラグ (1: 実行中, 0: 停止)。path ごと。
PotrServiceDef service
サービス定義。
int64_t service_id
サービス ID。
{
{
}
{
receiver_accept_n1_loop(ctx, path_idx);
}
else
{
}
}
static void * connect_thread_func(void *arg)
static void receiver_accept_loop(struct PotrContext_ *ctx, int path_idx)
static void sender_connect_loop(struct PotrContext_ *ctx, int path_idx)
PotrRole role
役割 (POTR_ROLE_SENDER / POTR_ROLE_RECEIVER)。
int is_multi_peer
1: N:1 モード (src_addr/src_port 省略), 0: 1:1 モード。
ファイル: prod/porter/libsrc/porter/thread/potrSendThread.c
PotrPayloadElem には既に peer_id フィールドが存在するため、 送信先ルーティングの変更のみで対応できる。
const uint8_t *wire_buf, size_t wire_len)
{
int k;
{
for (k = 0; k < ctx->
n_path; k++)
{
struct pollfd pfd = { peer->tcp_conn_fd[k], POLLOUT, 0 };
int pr = poll(&pfd, 1, 0);
if (pr > 0 && (pfd.revents & POLLOUT))
{
&peer->tcp_send_mutex[k],
{
"send_thread: peer[%u] path[%d]: tcp_send_all failed",
}
}
else
{
if (peer->tcp_buf_full_suppress_cnt[k]++ == 0)
{
"send_thread: peer[%u] path[%d]: send buffer full, "
"dropping packet", peer->
peer_id, k);
}
}
}
}
else
{
for (k = 0; k < peer->
n_paths; k++)
sendto(ctx->
sock[0], wire_buf, wire_len, 0,
}
}
static void flush_packed_peer(struct PotrContext_ *ctx, PotrPeerContext *peer, size_t packed_len)
static int tcp_send_all(PotrSocket fd, PotrMutex *mtx, const uint8_t *buf, size_t len)
PotrSocket sock[POTR_MAX_PATH]
各パスの UDP ソケット。
ファイル: prod/porter/libsrc/porter/api/potrOpenService.c
case POTR_TYPE_TCP_N1:
case POTR_TYPE_TCP_BIDIR_N1:
{
int is_bidir_n1 = (ctx->
service.
type == POTR_TYPE_TCP_BIDIR_N1);
{
for (i = 0; i < ctx->
n_path; i++) {
}
if (is_bidir_n1) {
goto error;
}
}
else
{
for (i = 0; i < ctx->
n_path; i++) {
}
}
break;
}
#define POTR_SEND_QUEUE_DEPTH
非同期送信キューの最大エントリ数のデフォルト値。設定ファイルの send_queue_depth で変更可能。メッセージがフラグメント化される場合、1 メッセージが複数エントリを占有する。
int potr_connect_thread_start(struct PotrContext_ *ctx)
TCP 接続管理スレッドを起動します (path 数分)。
static int open_socket_tcp_receiver(struct PotrContext_ *ctx, int path_idx)
int peer_table_init(struct PotrContext_ *ctx)
ピアテーブルを初期化する。
int potr_send_queue_init(PotrSendQueue *q, size_t depth, uint16_t max_payload)
#define POTR_MUTEX_INIT(m)
int potr_send_thread_start(struct PotrContext_ *ctx)
PotrGlobalConfig global
グローバル設定。
PotrSendQueue send_queue
非同期送信キュー。
PotrMutex tcp_state_mutex
tcp_state_cv 保護用ミューテックス。tcp_active_paths のカウンタ更新も保護。
PotrCondVar tcp_state_cv
切断通知・reconnect sleep の中断用条件変数。
uint16_t max_payload
最大ペイロード長 (バイト)。
ファイル: prod/porter/libsrc/porter/api/potrCloseService.c
case POTR_TYPE_TCP_N1:
case POTR_TYPE_TCP_BIDIR_N1:
{
{
for (i = 0; i < ctx->
n_path; i++) {
}
}
potr_connect_thread_join(ctx);
ctx->
peers[i].tcp_peer_running = 0;
for (k = 0; k < ctx->
n_path; k++) {
POTR_SOCKET_CLOSE(ctx->
peers[i].tcp_conn_fd[k]);
}
ctx->
peers[i].tcp_recv_running[k] = 0;
ctx->
peers[i].tcp_health_running[k] = 0;
POTR_CONDVAR_SIGNAL(ctx->
peers[i].tcp_health_wakeup[k]);
}
}
peer_table_destroy_tcp(ctx);
}
}
else
{
}
break;
}
void potr_connect_thread_stop(struct PotrContext_ *ctx)
TCP 接続管理スレッドを停止します。
void potr_send_queue_shutdown(PotrSendQueue *q)
void potr_send_queue_destroy(PotrSendQueue *q)
#define POTR_MUTEX_DESTROY(m)
void potr_send_thread_stop(struct PotrContext_ *ctx)
int max_peers
ピアテーブルサイズ (service.max_peers から取得)。
PotrPeerContext * peers
ピアテーブル (動的確保。max_peers エントリ)。
5.7 potrSend.c - BIDIR_N1 RECEIVER からの送信許可
ファイル: prod/porter/libsrc/porter/api/potrSend.c
}
#define POTR_ERROR
失敗の戻り値を表す定数。
ファイル: prod/porter/libsrc/porter/api/potrDisconnectPeer.c
{
if (peer != NULL && peer->
active)
{
for (k = 0; k < ctx->
n_path; k++) {
tcp_send_fin(peer->tcp_conn_fd[k], &peer->tcp_send_mutex[k], ctx);
shutdown(peer->tcp_conn_fd[k], SHUT_WR);
}
}
peer->tcp_peer_running = 0;
}
}
PotrPeerContext * peer_find_by_id(struct PotrContext_ *ctx, PotrPeerId peer_id)
peer_id でピアを検索する。
6. 設計上の懸念点とトレードオフ
追加する TCP N:1 フィールドのサイズ概算 (POTR_MAX_PATH=4 の場合):
| フィールド | サイズ (approx) |
| tcp_conn_fd[4] (int or SOCKET) | 16〜32 B |
| tcp_last_ping_recv_ms[4] | 32 B |
| tcp_last_ping_req_recv_ms[4] | 32 B |
| tcp_recv_thread[4] (pthread_t) | 32 B |
| tcp_health_thread[4] | 32 B |
| tcp_send_mutex[4] (pthread_mutex_t) | 160 B |
| tcp_health_mutex[4] | 160 B |
| tcp_health_wakeup[4] (pthread_cond_t) | 192 B |
| tcp_recv_window_mutex | 40 B |
| その他整数フィールド | 〜32 B |
| 合計 | 約 730〜760 B 増加 |
max_peers = 1024 の場合、ピアテーブル全体で約 750 KB の増加。
推奨: TCP N:1 向けのデフォルト max_peers を 32 程度に設定すること。
6.2 デッドロックリスクと回避策
以下の順序を厳守することでデッドロックを回避する。
recv スレッド (切断時):
1. tcp_health_running[k] = 0 + SIGNAL ← peers_mutex 外
2. JOIN(tcp_health_thread[k]) ← peers_mutex 外
3. LOCK(peers_mutex)
4. conn_fd クローズ + peer_free_tcp()
5. UNLOCK(peers_mutex)
health スレッド:
- peers_mutex を取得しない (取得不可の設計とする)
- ソケットクローズで recv スレッドへ切断を伝える
potrCloseService():
1. connect_thread_running = 0 (新規 accept を止める)
2. LOCK(peers_mutex) → tcp_peer_running = 0 + conn_fd クローズ → UNLOCK
3. peer_table_destroy_tcp() → 全スレッドを JOIN
6.3 ctx->tcp_active_paths カウンタの扱い
1:1 TCP の ctx->tcp_active_paths は N:1 では使用しない。 N:1 での送信可否判定は peer->tcp_conn_fd[k] != POTR_INVALID_SOCKET で行う。
6.4 スレッドスケーラビリティ
Thread-per-connection モデルでの最大スレッド数 (BIDIR_N1, n_path=4, max_peers=32):
accept スレッド: 4
recv スレッド: 128 (4 path × 32 peers)
health スレッド: 128 (4 path × 32 peers)
send スレッド: 1
合計: 261 スレッド
大量接続 (max_peers > 100) が必要な場合は、将来的に epoll (Linux) / IOCP (Windows) ベースのスレッドプールモデルへの移行を検討すること。
6.5 マルチパスにおけるセッション管理
6.5.1 問題の背景 (TCP 1:1 マルチパスの不具合)
本問題は N:1 固有ではなく、**既存の TCP 1:1 マルチパス実装にも存在した構造上の不具合**である。
UDP では recvfrom() がデータ受信・送信元アドレス取得・セッション ID 識別を原子的に行うため、同一 SENDER の再接続や追加パス接続を自然にセッション層で識別できる。 一方 TCP の accept() はソケット fd のみを返し、アプリケーション層のセッション識別子 (session_id, session_tv_sec, session_tv_nsec) は最初のパケットを受信するまで不明である。
このため、従来実装では accept() 直後に無条件で reset_connection_state() を呼んでいた。この関数は peer_session_known = 0 をリセットするが、他の path の recv スレッドが既にセッションデータを処理中である可能性があり、データ競合が発生していた。さらに、新セッションの接続なのか同一セッションの再接続・追加パスなのかを区別できなかった。
6.5.2 実装済みの修正: セッション層での対称化
TCP と UDP のセッション識別をセッション層レベルで対称にする修正を実装した (potrConnectThread.c, potrRecvThread.c, potrContext.h)。
設計の概要:
- accept スレッドが最初のパケットを先読みする accept() 直後、tcp_read_first_packet() でアプリケーション層の最初のパケットを受信する (タイムアウト付き)。パケット内の session triplet (session_id, session_tv_sec, session_tv_nsec) を取得する。
- session_establish_mutex によるシリアライズ 複数パスの accept スレッドが ctx->peer_session_* フィールドを同時に参照・更新しないよう、session_establish_mutex で排他制御する。
- セッション比較 (tcp_session_compare()) による 3 分類
| 結果 | 意味 | 処置 |
| TCP_SESSION_NEW | 新しいセッション (または初回接続) | 他の全アクティブパスを切断し reset_connection_state() を呼ぶ。その後 tcp_conn_fd[path_idx] を設定してスレッドを起動する。 |
| TCP_SESSION_SAME | 既存セッションの同一セッション ID | 追加パスとして接続する。reset_connection_state() は呼ばない。 |
| TCP_SESSION_OLD | 過去のセッション (期限切れ) | 接続を閉じてループを継続する。 |
- 先読みバッファの引き渡し accept スレッドが読み取った最初のパケットを tcp_first_pkt_buf[path_idx] / tcp_first_pkt_len[path_idx] に格納する。recv スレッドはループ開始時にこのバッファを先に処理し、通常の recv ループに入る。
追加されたフィールド (potrContext.h):
6.5.3 N:1 実装における注意事項
TCP N:1 の peer_create_tcp() を実装する際は、上記の session-layer 識別を基盤として、以下の設計を採用すること。
- accept() 後の先読みで得た session triplet を peer_create_tcp() / peer_lookup_by_session() の検索キーとして使用する
- 同一 session triplet の新たな path 接続は既存ピアへの追加パスとして扱う
- 異なる session triplet は新規ピアとして扱う (peer_create_tcp() を呼ぶ)
- per-peer の session_establish_mutex は各 PotrPeerContext に持たせ、ピア間の競合を防ぐ
7. 変更ファイル一覧
8. 実装順序
以下の順序で実装することで、各ステップでビルドを通しながら進められる。
- 型定義 (porter_type.h, potrContext.h)
- ピアテーブル拡張 (potrPeerTable.c)
- per-peer スレッド実装 (potrTcpPeerThread.c)
- recv スレッド関数を実装 (パケット処理は既存ロジックを流用)
- health スレッド関数を実装
- → ビルド確認
- N:1 accept ループ (potrConnectThread.c)
- 送信スレッド拡張 (potrSendThread.c)
- API 対応 (potrOpenService.c, potrCloseService.c, potrSend.c, potrDisconnectPeer.c)
- 各 API に TCP N:1 の case / 分岐を追加
- → ビルド確認
9. 動作確認方法
ビルド
既存 1:1 TCP の回帰確認
既存の POTR_TYPE_TCP / POTR_TYPE_TCP_BIDIR を使用した接続が引き続き正常動作することを確認する。
N:1 動作確認
# RECEIVER 起動 (POTR_TYPE_TCP_N1)
./recv --service-id 1 # 設定ファイルで type=TCP_N1 を指定
# SENDER A 接続 (POTR_TYPE_TCP)
./send --service-id 1 --data "hello from A"
# SENDER B 接続 (別プロセス)
./send --service-id 1 --data "hello from B"
# 期待動作:
# - RECEIVER の callback が peer_id=A で "hello from A" を受信
# - RECEIVER の callback が peer_id=B で "hello from B" を受信
# - A が切断しても B の接続は維持される
# - A 切断時に callback(POTR_EVENT_DISCONNECTED, peer_id=A) が発火
BIDIR_N1 の返信確認
# RECEIVER から peer_id=A への返信
potrSend(handle, peer_id_A, "response to A", len, 0);
クリーンシャットダウン確認
potrCloseService() 実行後にスレッドリーク・メモリリークがないことを valgrind または ThreadSanitizer で確認する。