25 #include <sys/socket.h>
26 #include <sys/select.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
66 clock_gettime(CLOCK_MONOTONIC, &ts);
67 return (uint64_t)ts.tv_sec * 1000ULL + (uint64_t)ts.tv_nsec / 1000000ULL;
69 return (uint64_t)GetTickCount64();
93 struct timespec abs_ts;
94 clock_gettime(CLOCK_REALTIME, &abs_ts);
95 abs_ts.tv_sec += (time_t)(wait_ms / 1000U);
96 abs_ts.tv_nsec += (long)((wait_ms % 1000U) * 1000000UL);
97 if (abs_ts.tv_nsec >= 1000000000L)
100 abs_ts.tv_nsec -= 1000000000L;
134 r = (int)recv(fd, (
char *)(buf + received), n - received, 0);
135 if (r < 0)
return -1;
136 if (r == 0)
return 0;
138 r = recv(fd, (
char *)(buf + received), (
int)(n - received), 0);
139 if (r == SOCKET_ERROR)
return -1;
140 if (r == 0)
return 0;
142 received += (size_t)r;
158 tv.tv_sec = (time_t)(wait_ms / 1000U);
159 tv.tv_usec = (suseconds_t)((wait_ms % 1000U) * 1000U);
160 ret = select(fd + 1, &rfds, NULL, NULL, &tv);
161 if (ret < 0 && errno == EINTR)
return 0;
162 if (ret < 0)
return -1;
168 tv.tv_sec = (long)(wait_ms / 1000U);
169 tv.tv_usec = (long)((wait_ms % 1000U) * 1000U);
170 ret = select(0, &rfds, NULL, NULL, &tv);
171 if (ret == SOCKET_ERROR)
return -1;
173 return (ret > 0) ? 1 : 0;
180 size_t *out_len, uint32_t timeout_ms)
183 uint16_t wire_payload_len;
188 if (ready == 0)
return 0;
189 if (ready < 0)
return -1;
193 if (r <= 0)
return -1;
198 memcpy(&wpl, buf + 30,
sizeof(wpl));
199 wire_payload_len = ntohs(wpl);
206 if (wire_payload_len > 0)
209 if (r <= 0)
return -1;
217#define TCP_SESSION_NEW ( 1)
218#define TCP_SESSION_SAME ( 0)
219#define TCP_SESSION_OLD (-1)
247 WaitForSingleObject(ctx->
recv_thread[path_idx], INFINITE);
283 "connect_thread[service_id=%" PRId64
"]: DISCONNECTED (all paths down)",
319 "connect_thread[service_id=%" PRId64
"]: send_thread_start failed",
329 "connect_thread[service_id=%" PRId64
"]: tcp_recv_thread_start failed"
341 if (is_sender || is_bidir)
346 "connect_thread[service_id=%" PRId64
"]: tcp_health_thread_start failed"
368 if (is_sender || is_bidir)
384 struct sockaddr_in addr;
388 sock = socket(AF_INET, SOCK_STREAM, 0);
392 "connect_thread[service_id=%" PRId64
"]: socket() failed",
398 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse,
sizeof(reuse));
400 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
401 (
const char *)&reuse,
sizeof(reuse));
406 struct sockaddr_in bind_addr;
407 memset(&bind_addr, 0,
sizeof(bind_addr));
408 bind_addr.sin_family = AF_INET;
415 bind_addr.sin_addr.s_addr = htonl(INADDR_ANY);
420 if (bind(sock, (
struct sockaddr *)&bind_addr,
sizeof(bind_addr)) < 0)
422 if (bind(sock, (
struct sockaddr *)&bind_addr,
sizeof(bind_addr)) == SOCKET_ERROR)
426 "connect_thread[service_id=%" PRId64
"]: bind() failed",
437 memset(&addr, 0,
sizeof(addr));
438 addr.sin_family = AF_INET;
442 if (timeout_ms == 0U)
445 if (connect(sock, (
struct sockaddr *)&addr,
sizeof(addr)) < 0)
448 "connect_thread[service_id=%" PRId64
"]: connect() failed (blocking)",
469 flags = fcntl(sock, F_GETFL, 0);
470 fcntl(sock, F_SETFL, flags | O_NONBLOCK);
472 ret = connect(sock, (
struct sockaddr *)&addr,
sizeof(addr));
476 fcntl(sock, F_SETFL, flags);
479 if (errno != EINPROGRESS)
482 "connect_thread[service_id=%" PRId64
"]: connect() failed (errno=%d)",
490 uint32_t elapsed_ms = 0U;
493 while (elapsed_ms < timeout_ms && ctx->connect_thread_running[path_idx])
499 poll_ms = timeout_ms - elapsed_ms;
500 if (poll_ms > 100U) poll_ms = 100U;
503 FD_SET(sock, &writefds);
504 tv.tv_sec = (long)(poll_ms / 1000U);
505 tv.tv_usec = (long)((poll_ms % 1000U) * 1000L);
507 ret = select(sock + 1, NULL, &writefds, NULL, &tv);
510 if (errno == EINTR) { elapsed_ms += poll_ms;
continue; }
518 elapsed_ms += poll_ms;
524 "connect_thread[service_id=%" PRId64
"]: connect() timed out",
531 errlen = (socklen_t)
sizeof(error);
532 getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &errlen);
536 "connect_thread[service_id=%" PRId64
"]: connect() SO_ERROR=%d",
543 fcntl(sock, F_SETFL, flags);
553 ioctlsocket(sock, FIONBIO, &mode);
555 ret = connect(sock, (
struct sockaddr *)&addr,
sizeof(addr));
560 ioctlsocket(sock, FIONBIO, &mode);
563 if (WSAGetLastError() != WSAEWOULDBLOCK)
566 "connect_thread[service_id=%" PRId64
"]: connect() failed (WSA error)",
574 uint32_t elapsed_ms = 0U;
577 while (elapsed_ms < timeout_ms && ctx->connect_thread_running[path_idx])
583 poll_ms = timeout_ms - elapsed_ms;
584 if (poll_ms > 100U) poll_ms = 100U;
587 FD_SET(sock, &writefds);
588 tv.tv_sec = (long)(poll_ms / 1000U);
589 tv.tv_usec = (long)((poll_ms % 1000U) * 1000L);
591 ret = select(0, NULL, &writefds, NULL, &tv);
596 if (ret > 0 && FD_ISSET(sock, &writefds))
601 elapsed_ms += poll_ms;
607 "connect_thread[service_id=%" PRId64
"]: connect() timed out",
614 errlen = (int)
sizeof(error);
615 getsockopt(sock, SOL_SOCKET, SO_ERROR, (
char *)&error, &errlen);
619 "connect_thread[service_id=%" PRId64
"]: connect() SO_ERROR=%d",
627 ioctlsocket(sock, FIONBIO, &mode);
636 int is_reconnect = 0;
644 "connect_thread[service_id=%" PRId64
" path=%d]: connecting to %s:%u ...",
658 "connect_thread[service_id=%" PRId64
" path=%d]: connect failed, "
659 "no reconnect (reconnect_interval_ms=0)",
665 "connect_thread[service_id=%" PRId64
" path=%d]: connect failed, "
674 "connect_thread[service_id=%" PRId64
" path=%d]: TCP connected",
696 if (is_reconnect && path_idx == 0)
713 if (active_count == 0)
730 "connect_thread[service_id=%" PRId64
" path=%d]: TCP disconnected",
746 if (active_count == 0)
756 "connect_thread[service_id=%" PRId64
" path=%d]: no reconnect "
757 "(reconnect_interval_ms=0)",
763 "connect_thread[service_id=%" PRId64
" path=%d]: waiting %ums before reconnect",
786 int is_reconnect = 0;
796 struct sockaddr_in peer_addr;
797 socklen_t peer_len = (socklen_t)
sizeof(peer_addr);
799 char peer_addr_str[INET_ADDRSTRLEN];
803 (
struct sockaddr *)&peer_addr, &peer_len);
810 "connect_thread[service_id=%" PRId64
" path=%d]: accept() error, retrying",
815 inet_ntop(AF_INET, &peer_addr.sin_addr, peer_addr_str,
sizeof(peer_addr_str));
822 if (peer_addr.sin_addr.s_addr !=
838 "connect_thread[service_id=%" PRId64
" path=%d]: rejected connection"
839 " from %s:%u (src filter)",
842 (
unsigned)ntohs(peer_addr.sin_port));
853 "connect_thread[service_id=%" PRId64
" path=%d]: TCP accepted from %s:%u",
856 (
unsigned)ntohs(peer_addr.sin_port));
868 first_pkt_timeout_ms);
873 "connect_thread[service_id=%" PRId64
" path=%d]: "
874 "first packet read failed (r=%d), closing",
888 "connect_thread[service_id=%" PRId64
" path=%d]: "
889 "first packet parse failed, closing",
917 "connect_thread[service_id=%" PRId64
" path=%d]: "
918 "old session rejected (known_id=%u pkt_id=%u)",
934 for (k = 0; k < ctx->
n_path; k++)
936 if (k == path_idx)
continue;
973 if (is_bidir && session_result ==
TCP_SESSION_NEW && is_reconnect && path_idx == 0)
989 if (active_count == 0)
1003 "connect_thread[service_id=%" PRId64
" path=%d]: TCP connection closed",
1022 if (active_count == 0)
1044 "connect_thread[service_id=%" PRId64
" path=%d]: started (role=%s type=%s)",
1061 "connect_thread[service_id=%" PRId64
" path=%d]: exited",
1094 "connect_thread[service_id=%" PRId64
"]: starting %d path(s)",
1106 for (i = 0; i < ctx->
n_path; i++)
1115 "connect_thread[service_id=%" PRId64
"]: "
1116 "tcp_first_pkt_buf[%d] malloc failed",
1119 for (j = 0; j < i; j++)
1134 for (i = 0; i < ctx->
n_path; i++)
1146 "connect_thread[service_id=%" PRId64
" path=%d]: pthread_create failed",
1157 "connect_thread[service_id=%" PRId64
" path=%d]: CreateThread failed",
1186 int any_running = 0;
1188 if (ctx == NULL) {
return; }
1190 for (i = 0; i < ctx->
n_path; i++)
1194 if (!any_running) {
return; }
1197 for (i = 0; i < ctx->
n_path; i++)
1216 for (i = 0; i < ctx->
n_path; i++)
1230 for (i = 0; i < ctx->
n_path; i++)
1245 for (i = 0; i < ctx->
n_path; i++)
1250 for (i = 0; i < ctx->
n_path; i++)
1265 for (i = 0; i < ctx->
n_path; i++)
1282 "connect_thread[service_id=%" PRId64
"]: all paths stopped",
#define POTR_MAX_PATH
マルチパスの最大パス数。
#define POTR_PEER_NA
ピア ID 未割当を示す予約値。 1:1 モードのコールバックで渡される (ピアの概念がない)。 potrSend() に N:1 モードで指定した場合はエラーを返す。
#define POTR_SUCCESS
成功の戻り値を表す定数。
#define POTR_ERROR
失敗の戻り値を表す定数。
int packet_parse(PotrPacket *packet, const void *buf, size_t buf_len)
受信バイト列をパケット構造体に解析します。
#define PACKET_HEADER_SIZE
パケットヘッダーの固定長 (バイト)。payload フィールドの開始オフセット。
@ POTR_TRACE_ERROR
エラー。操作の失敗を記録。TRACE_LV_ERROR (1) と同値。
@ POTR_TRACE_INFO
情報。TRACE_LV_INFO (3) と同値。
@ POTR_TRACE_VERBOSE
詳細情報 (デバッグ)。TRACE_LV_VERBOSE (4) と同値。
@ POTR_TRACE_WARNING
警告。回復可能な異常を記録。TRACE_LV_WARNING (2) と同値。
@ POTR_TYPE_TCP_BIDIR
TCP 双方向通信 (両端が potrSend 可)。
@ POTR_EVENT_DISCONNECTED
切断を検知 (タイムアウト / FIN 受信 / REJECT 受信)。data=NULL, len=0。
static PotrSocket tcp_connect_with_timeout(struct PotrContext_ *ctx, int path_idx)
void potr_connect_thread_stop(struct PotrContext_ *ctx)
TCP 接続管理スレッドを停止します。
static void * connect_thread_func(void *arg)
static void reset_send_queue(struct PotrContext_ *ctx)
static void join_recv_thread(struct PotrContext_ *ctx, int path_idx)
static void close_tcp_conn(struct PotrContext_ *ctx, int path_idx)
static void receiver_accept_loop(struct PotrContext_ *ctx, int path_idx)
static void reconnect_wait(struct PotrContext_ *ctx, int path_idx, uint32_t wait_ms)
static void reset_all_paths_disconnected(struct PotrContext_ *ctx)
static int tcp_read_first_packet(PotrSocket fd, uint8_t *buf, size_t max_buf, size_t *out_len, uint32_t timeout_ms)
static uint64_t connect_get_ms(void)
static void reset_connection_state(struct PotrContext_ *ctx)
static int start_connected_threads(struct PotrContext_ *ctx, int path_idx)
static int accept_tcp_wait_readable(PotrSocket fd, uint32_t wait_ms)
static ConnectArg s_connect_args[POTR_MAX_PATH]
static void stop_connected_threads(struct PotrContext_ *ctx, int path_idx)
static void sender_connect_loop(struct PotrContext_ *ctx, int path_idx)
static int accept_tcp_read_all(PotrSocket fd, uint8_t *buf, size_t n)
int potr_connect_thread_start(struct PotrContext_ *ctx)
TCP 接続管理スレッドを起動します (path 数分)。
static int tcp_session_compare(const struct PotrContext_ *ctx, const PotrPacket *pkt)
#define POTR_INVALID_SOCKET
int potr_tcp_health_thread_stop(struct PotrContext_ *ctx, int path_idx)
TCP ヘルスチェックスレッドを停止します。
int potr_tcp_health_thread_start(struct PotrContext_ *ctx, int path_idx)
TCP ヘルスチェックスレッドを path ごとに起動します。
#define POTR_LOG(level,...)
porter 内部ログ出力マクロ。
int tcp_recv_thread_start(struct PotrContext_ *ctx, int path_idx)
TCP 受信スレッドを path ごとに起動します。
int potr_send_queue_init(PotrSendQueue *q, size_t depth, uint16_t max_payload)
void potr_send_queue_destroy(PotrSendQueue *q)
int potr_send_thread_start(struct PotrContext_ *ctx)
void potr_send_thread_stop(struct PotrContext_ *ctx)
struct PotrContext_ * ctx
セッションコンテキスト構造体。PotrHandle の実体。
volatile int tcp_active_paths
アクティブ TCP path 数 (0 = 全切断)。
PotrRecvCallback callback
受信コールバック。
volatile uint64_t tcp_last_ping_req_recv_ms[POTR_MAX_PATH]
TCP PING 要求最終受信時刻 (ms, CLOCK_MONOTONIC 基準)。path ごと。接続確立時に現在時刻で初期化。
PotrGlobalConfig global
グローバル設定。
volatile uint64_t tcp_last_ping_recv_ms[POTR_MAX_PATH]
TCP PING 応答最終受信時刻 (ms, CLOCK_MONOTONIC 基準)。path ごと。接続確立時に現在時刻で初期化。
struct in_addr dst_addr_resolved[POTR_MAX_PATH]
解決済み宛先 IPv4 アドレス (unicast のみ)。
int64_t peer_session_tv_sec
追跡中の相手セッション開始時刻 秒部。
int32_t peer_session_tv_nsec
追跡中の相手セッション開始時刻 ナノ秒部。
struct in_addr src_addr_resolved[POTR_MAX_PATH]
解決済み送信元 IPv4 アドレス。
PotrRole role
役割 (POTR_ROLE_SENDER / POTR_ROLE_RECEIVER)。
PotrSendQueue send_queue
非同期送信キュー。
volatile int running[POTR_MAX_PATH]
受信スレッド実行フラグ (1: 実行中, 0: 停止)。path ごと。
volatile int health_alive
疎通状態 (1: alive, 0: dead/未接続)。UDP 用。受信者が管理。
size_t frag_buf_len
フラグメント結合バッファの現在のデータ長 (バイト)。
PotrThread recv_thread[POTR_MAX_PATH]
受信スレッドハンドル (path ごと)。
uint32_t peer_session_id
追跡中の相手セッション識別子。
size_t tcp_first_pkt_len[POTR_MAX_PATH]
先読みパケットのバイト数 (0: 先読みなし)。
PotrMutex tcp_state_mutex
tcp_state_cv 保護用ミューテックス。tcp_active_paths のカウンタ更新も保護。
PotrMutex session_establish_mutex
PotrSocket tcp_listen_sock[POTR_MAX_PATH]
RECEIVER: listen ソケット (path ごと)。
volatile int connect_thread_running[POTR_MAX_PATH]
connect スレッド実行フラグ (1: 実行中, 0: 停止)。path ごと。
PotrServiceDef service
サービス定義。
uint8_t * tcp_first_pkt_buf[POTR_MAX_PATH]
先読みパケットバッファ (動的確保、PACKET_HEADER_SIZE + max_payload バイト)。
int peer_session_known
相手セッションが初期化済みか (0: 未初期化)。
PotrThread connect_thread[POTR_MAX_PATH]
SENDER: connect スレッド。RECEIVER: accept スレッド。path ごと。
PotrWindow send_window
送信バッファ (過去 N パケット保持。NACK 再送・REJECT 判定に使用)。
volatile int send_thread_running
送信スレッド実行フラグ (1: 実行中, 0: 停止)。
PotrCondVar tcp_state_cv
切断通知・reconnect sleep の中断用条件変数。
PotrSocket tcp_conn_fd[POTR_MAX_PATH]
アクティブ TCP 接続 fd (path ごと)。
uint32_t tcp_health_timeout_ms
TCP 通信種別の PING 応答待機タイムアウト (ミリ秒)。SENDER 側で使用。0 = 無効。設定ファイルキー: tcp_health_timeout_ms。
uint16_t max_payload
最大ペイロード長 (バイト)。
int64_t session_tv_sec
セッション開始時刻 秒部 (NBO)。struct timespec の tv_sec 相当。
uint32_t session_id
セッション識別子 (NBO)。potrOpenService 時に決定する乱数。
int32_t session_tv_nsec
セッション開始時刻 ナノ秒部 (NBO)。struct timespec の tv_nsec 相当。
size_t depth
キュー容量 (エントリ数)。
uint16_t src_port
送信者の送信元 bind ポート番号。0 = OS 自動選定。(全通信種別で省略可)
uint16_t dst_port
宛先ポート番号。サービスの識別子。受信者の bind ポート / 送信者の送信先ポート。(全通信種別で必須)
char src_addr[POTR_MAX_PATH][POTR_MAX_ADDR_LEN]
送信元アドレス [0]=src_addr1 〜 [3]=src_addr4。送信者は bind / 送信インターフェース、受信者は送信元フィルタ。(全通信種別で必須)
int64_t service_id
サービス ID。
uint32_t connect_timeout_ms
SENDER TCP 接続タイムアウト (ms)。0 = OS デフォルト。デフォルト: POTR_DEFAULT_CONNECT_TIMEOUT_MS。
char dst_addr[POTR_MAX_PATH][POTR_MAX_ADDR_LEN]
宛先アドレス [0]=dst_addr1 〜 [3]=dst_addr4。送信者は送信先、受信者は bind アドレス。(unicast のみ)
uint32_t reconnect_interval_ms
SENDER 自動再接続間隔 (ms)。0 = 再接続なし。デフォルト: POTR_DEFAULT_RECONNECT_INTERVAL_MS。
uint32_t base_seq
ウィンドウ先頭の通番。
uint32_t next_seq
送信側: 次に割り当てる通番。受信側: 次に期待する通番。
uint8_t * valid
バッファ有効フラグ配列 (動的確保。window_size バイト)。
uint16_t window_size
ウィンドウサイズ (パケット数)。
スライディングウィンドウ管理モジュールの内部ヘッダー。