Document of c-modernization-kit (porter) 1.0.0
Loading...
Searching...
No Matches
potrConnectThread.c
Go to the documentation of this file.
1
19
20#include <stdlib.h>
21#include <inttypes.h>
22#include <string.h>
23
24#ifndef _WIN32
25 #include <sys/socket.h>
26 #include <sys/select.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <unistd.h>
30 #include <fcntl.h>
31 #include <errno.h>
32 #include <time.h>
33#else /* _WIN32 */
34 #include <winsock2.h>
35 #include <ws2tcpip.h>
36#endif /* _WIN32 */
37
38#include <porter_const.h>
39
40#include "../potrContext.h"
41#include "../protocol/packet.h"
43#include "../protocol/window.h"
44#include "../infra/potrLog.h"
45#include "potrConnectThread.h"
46#include "potrRecvThread.h"
47#include "potrSendThread.h"
48#include "potrHealthThread.h"
49
50/* connect/accept スレッドに渡す引数 */
51typedef struct
52{
55 int _pad;
57
58/* 静的に確保した引数バッファ (path 数分) */
60
61/* 現在時刻をミリ秒単位で返す (単調増加クロック) */
62static uint64_t connect_get_ms(void)
63{
64#ifndef _WIN32
65 struct timespec ts;
66 clock_gettime(CLOCK_MONOTONIC, &ts);
67 return (uint64_t)ts.tv_sec * 1000ULL + (uint64_t)ts.tv_nsec / 1000000ULL;
68#else /* _WIN32 */
69 return (uint64_t)GetTickCount64();
70#endif /* _WIN32 */
71}
72
73/* TCP 接続ソケット [path_idx] をシャットダウン・クローズして INVALID にする */
74static void close_tcp_conn(struct PotrContext_ *ctx, int path_idx)
75{
76 if (ctx->tcp_conn_fd[path_idx] != POTR_INVALID_SOCKET)
77 {
78#ifndef _WIN32
79 shutdown(ctx->tcp_conn_fd[path_idx], SHUT_RDWR);
80 close(ctx->tcp_conn_fd[path_idx]);
81#else /* _WIN32 */
82 shutdown(ctx->tcp_conn_fd[path_idx], SD_BOTH);
83 closesocket(ctx->tcp_conn_fd[path_idx]);
84#endif /* _WIN32 */
85 ctx->tcp_conn_fd[path_idx] = POTR_INVALID_SOCKET;
86 }
87}
88
89/* 再接続待機: reconnect_interval_ms 経過または停止シグナルまでスリープする */
90static void reconnect_wait(struct PotrContext_ *ctx, int path_idx, uint32_t wait_ms)
91{
92#ifndef _WIN32
93 struct timespec abs_ts;
94 clock_gettime(CLOCK_REALTIME, &abs_ts);
95 abs_ts.tv_sec += (time_t)(wait_ms / 1000U);
96 abs_ts.tv_nsec += (long)((wait_ms % 1000U) * 1000000UL);
97 if (abs_ts.tv_nsec >= 1000000000L)
98 {
99 abs_ts.tv_sec++;
100 abs_ts.tv_nsec -= 1000000000L;
101 }
102 pthread_mutex_lock(&ctx->tcp_state_mutex);
103 if (ctx->connect_thread_running[path_idx])
104 {
105 pthread_cond_timedwait(&ctx->tcp_state_cv, &ctx->tcp_state_mutex, &abs_ts);
106 }
107 pthread_mutex_unlock(&ctx->tcp_state_mutex);
108#else /* _WIN32 */
109 EnterCriticalSection(&ctx->tcp_state_mutex);
110 if (ctx->connect_thread_running[path_idx])
111 {
112 SleepConditionVariableCS(&ctx->tcp_state_cv,
113 &ctx->tcp_state_mutex,
114 (DWORD)wait_ms);
115 }
116 LeaveCriticalSection(&ctx->tcp_state_mutex);
117#endif /* _WIN32 */
118}
119
120/* ================================================================
121 * TCP セッション識別ヘルパー (accept スレッド専用)
122 * ================================================================ */
123
124/* TCP ソケットから正確に n バイト読み取る。
125 * accept スレッド専用。potrRecvThread.c の tcp_read_all と同一実装。
126 * 戻り値: 1 = 成功、0 = 切断 (recv が 0)、-1 = エラー。 */
127static int accept_tcp_read_all(PotrSocket fd, uint8_t *buf, size_t n)
128{
129 size_t received = 0;
130 while (received < n)
131 {
132 int r;
133#ifndef _WIN32
134 r = (int)recv(fd, (char *)(buf + received), n - received, 0);
135 if (r < 0) return -1;
136 if (r == 0) return 0;
137#else /* _WIN32 */
138 r = recv(fd, (char *)(buf + received), (int)(n - received), 0);
139 if (r == SOCKET_ERROR) return -1;
140 if (r == 0) return 0;
141#endif /* _WIN32 */
142 received += (size_t)r;
143 }
144 return 1;
145}
146
147/* TCP ソケットが読み取り可能になるまで最大 wait_ms ミリ秒待機する。
148 * accept スレッド専用。potrRecvThread.c の tcp_wait_readable と同一実装。
149 * 戻り値: 1 = データあり、0 = タイムアウト、-1 = エラー。 */
150static int accept_tcp_wait_readable(PotrSocket fd, uint32_t wait_ms)
151{
152 int ret;
153#ifndef _WIN32
154 fd_set rfds;
155 struct timeval tv;
156 FD_ZERO(&rfds);
157 FD_SET(fd, &rfds);
158 tv.tv_sec = (time_t)(wait_ms / 1000U);
159 tv.tv_usec = (suseconds_t)((wait_ms % 1000U) * 1000U);
160 ret = select(fd + 1, &rfds, NULL, NULL, &tv);
161 if (ret < 0 && errno == EINTR) return 0;
162 if (ret < 0) return -1;
163#else /* _WIN32 */
164 fd_set rfds;
165 struct timeval tv;
166 FD_ZERO(&rfds);
167 FD_SET(fd, &rfds);
168 tv.tv_sec = (long)(wait_ms / 1000U);
169 tv.tv_usec = (long)((wait_ms % 1000U) * 1000U);
170 ret = select(0, &rfds, NULL, NULL, &tv);
171 if (ret == SOCKET_ERROR) return -1;
172#endif /* _WIN32 */
173 return (ret > 0) ? 1 : 0;
174}
175
176/* accept 直後の TCP ソケットから 1 パケット分を buf に読み取る。
177 * buf は PACKET_HEADER_SIZE + max_payload バイト以上確保されていること。
178 * 戻り値: 1 = 成功 (*out_len にバイト数を格納)、0 = タイムアウト、-1 = EOF/エラー/不正。 */
179static int tcp_read_first_packet(PotrSocket fd, uint8_t *buf, size_t max_buf,
180 size_t *out_len, uint32_t timeout_ms)
181{
182 int ready;
183 uint16_t wire_payload_len;
184 int r;
185
186 /* タイムアウト付き待機 */
187 ready = accept_tcp_wait_readable(fd, timeout_ms);
188 if (ready == 0) return 0; /* タイムアウト */
189 if (ready < 0) return -1; /* エラー */
190
191 /* ヘッダー 32B 読み取り */
193 if (r <= 0) return -1;
194
195 /* ペイロード長を NBO で取得 (offset 30) */
196 {
197 uint16_t wpl;
198 memcpy(&wpl, buf + 30, sizeof(wpl));
199 wire_payload_len = ntohs(wpl);
200 }
201
202 /* ペイロード長バリデーション */
203 if (PACKET_HEADER_SIZE + (size_t)wire_payload_len > max_buf) return -1;
204
205 /* ペイロード読み取り */
206 if (wire_payload_len > 0)
207 {
208 r = accept_tcp_read_all(fd, buf + PACKET_HEADER_SIZE, (size_t)wire_payload_len);
209 if (r <= 0) return -1;
210 }
211
212 *out_len = PACKET_HEADER_SIZE + (size_t)wire_payload_len;
213 return 1;
214}
215
216/* session triplet 比較の戻り値 */
217#define TCP_SESSION_NEW ( 1) /* 新セッション (または初回接続) */
218#define TCP_SESSION_SAME ( 0) /* 同一セッション */
219#define TCP_SESSION_OLD (-1) /* 旧セッション (破棄すべき) */
220
221/* ctx に記録されている相手セッションと pkt のセッション triplet を比較する。
222 * peer_session_known == 0 の場合は TCP_SESSION_NEW を返す。
223 * 呼び出し前提: session_establish_mutex を取得済みであること。 */
224static int tcp_session_compare(const struct PotrContext_ *ctx,
225 const PotrPacket *pkt)
226{
227 if (!ctx->peer_session_known) return TCP_SESSION_NEW;
228
229 if (pkt->session_tv_sec > ctx->peer_session_tv_sec) return TCP_SESSION_NEW;
230 else if (pkt->session_tv_sec < ctx->peer_session_tv_sec) return TCP_SESSION_OLD;
231 else if (pkt->session_tv_nsec > ctx->peer_session_tv_nsec) return TCP_SESSION_NEW;
232 else if (pkt->session_tv_nsec < ctx->peer_session_tv_nsec) return TCP_SESSION_OLD;
233 else if (pkt->session_id > ctx->peer_session_id) return TCP_SESSION_NEW;
234 else if (pkt->session_id < ctx->peer_session_id) return TCP_SESSION_OLD;
235 return TCP_SESSION_SAME;
236}
237
238/* recv スレッド [path_idx] の自然終了を待機してハンドルを解放する。
239 TCP 接続断後 recv スレッドが自然終了する設計のため、ソケットはクローズしない。 */
240static void join_recv_thread(struct PotrContext_ *ctx, int path_idx)
241{
242#ifndef _WIN32
243 pthread_join(ctx->recv_thread[path_idx], NULL);
244#else /* _WIN32 */
245 if (ctx->recv_thread[path_idx] != NULL)
246 {
247 WaitForSingleObject(ctx->recv_thread[path_idx], INFINITE);
248 CloseHandle(ctx->recv_thread[path_idx]);
249 ctx->recv_thread[path_idx] = NULL;
250 }
251#endif /* _WIN32 */
252}
253
254/* 接続確立前のコンテキスト状態をリセットする。
255 フラグメントバッファや peer_session 状態をクリアする。
256 TCP v2 マルチパスでは send_window 通番と health_alive は保持する
257 (部分切断・再接続時のセッション継続のため)。
258 呼び出しタイミング: start_connected_threads 前 (他スレッド未起動)。 */
259static void reset_connection_state(struct PotrContext_ *ctx)
260{
261 ctx->peer_session_known = 0;
262 ctx->frag_buf_len = 0;
263}
264
265/* 全 path 切断時のリセット: send_window 通番・peer_session 状態・health_alive をクリアし
266 DISCONNECTED イベントを発火する。
267 呼び出しタイミング: tcp_active_paths が 0 になった直後 (tcp_state_mutex 非保護)。 */
269{
270 ctx->peer_session_known = 0;
271 ctx->frag_buf_len = 0;
272 ctx->send_window.next_seq = 0U;
273 ctx->send_window.base_seq = 0U;
274 if (ctx->send_window.valid != NULL)
275 {
276 memset(ctx->send_window.valid, 0,
277 (size_t)ctx->send_window.window_size * sizeof(uint8_t));
278 }
279 if (ctx->health_alive)
280 {
281 ctx->health_alive = 0;
283 "connect_thread[service_id=%" PRId64 "]: DISCONNECTED (all paths down)",
284 ctx->service.service_id);
285 if (ctx->callback != NULL)
286 {
288 POTR_EVENT_DISCONNECTED, NULL, 0);
289 }
290 }
291}
292
293/* 送信キューを再初期化する (reconnect 時に shutdown 済みのキューをリセットする)。
294 depth と max_payload はキュー構造体から取得する。 */
295static void reset_send_queue(struct PotrContext_ *ctx)
296{
297 size_t depth = ctx->send_queue.depth;
298 uint16_t max_payload = (uint16_t)ctx->global.max_payload;
300 (void)potr_send_queue_init(&ctx->send_queue, depth, max_payload);
301}
302
303/* 接続確立後に依存スレッドを起動する (path ごと)。
304 SENDER および TCP_BIDIR RECEIVER: path_idx==0 の初回のみ send スレッドを起動する。
305 各 path で recv スレッドと health スレッドを起動。
306 TCP RECEIVER: recv スレッドのみ起動。
307 失敗時は起動済みスレッドをすべて停止してから POTR_ERROR を返す。 */
308static int start_connected_threads(struct PotrContext_ *ctx, int path_idx)
309{
310 int is_bidir = (ctx->service.type == POTR_TYPE_TCP_BIDIR);
311 int is_sender = (ctx->role == POTR_ROLE_SENDER);
312
313 /* SENDER または TCP_BIDIR RECEIVER: path[0] の初回接続時のみ送信スレッドを起動 */
314 if ((is_sender || is_bidir) && path_idx == 0 && !ctx->send_thread_running)
315 {
317 {
319 "connect_thread[service_id=%" PRId64 "]: send_thread_start failed",
320 ctx->service.service_id);
321 return POTR_ERROR;
322 }
323 }
324
325 /* recv スレッドを path ごとに起動 */
326 if (tcp_recv_thread_start(ctx, path_idx) != POTR_SUCCESS)
327 {
329 "connect_thread[service_id=%" PRId64 "]: tcp_recv_thread_start failed"
330 " (path=%d)",
331 ctx->service.service_id, path_idx);
332 if ((is_sender || is_bidir) && path_idx == 0 && !ctx->send_thread_running)
333 {
334 close_tcp_conn(ctx, path_idx);
336 }
337 return POTR_ERROR;
338 }
339
340 /* SENDER または TCP_BIDIR RECEIVER: health スレッドを path ごとに起動 */
341 if (is_sender || is_bidir)
342 {
343 if (potr_tcp_health_thread_start(ctx, path_idx) != POTR_SUCCESS)
344 {
346 "connect_thread[service_id=%" PRId64 "]: tcp_health_thread_start failed"
347 " (path=%d)",
348 ctx->service.service_id, path_idx);
349 /* recv スレッドをソケットクローズで自然終了させる */
350 ctx->running[path_idx] = 0;
351 close_tcp_conn(ctx, path_idx);
352 join_recv_thread(ctx, path_idx);
353 return POTR_ERROR;
354 }
355 }
356
357 return POTR_SUCCESS;
358}
359
360/* 接続断後に依存スレッドを停止する (path ごと)。
361 呼び出し前提: join_recv_thread(path_idx) 完了済み (recv スレッドは終了している)。
362 注意: 送信スレッドは共有のため、potr_connect_thread_stop で全 path join 後に停止する。 */
363static void stop_connected_threads(struct PotrContext_ *ctx, int path_idx)
364{
365 int is_bidir = (ctx->service.type == POTR_TYPE_TCP_BIDIR);
366 int is_sender = (ctx->role == POTR_ROLE_SENDER);
367
368 if (is_sender || is_bidir)
369 {
370 /* health スレッドを先に停止 (PING 送信が tcp_conn_fd を参照するため) */
371 potr_tcp_health_thread_stop(ctx, path_idx);
372 }
373
374 /* 接続ソケットをクローズ */
375 close_tcp_conn(ctx, path_idx);
376}
377
378/* SENDER: path_idx 番目の宛先へ TCP 接続を試みる。
379 connect_timeout_ms が 0 の場合は OS デフォルト (ブロッキング) で接続する。
380 成功時はソケットを返す。失敗時は POTR_INVALID_SOCKET を返す。 */
381static PotrSocket tcp_connect_with_timeout(struct PotrContext_ *ctx, int path_idx)
382{
383 PotrSocket sock;
384 struct sockaddr_in addr;
385 uint32_t timeout_ms = ctx->service.connect_timeout_ms;
386 int reuse = 1;
387
388 sock = socket(AF_INET, SOCK_STREAM, 0);
389 if (sock == POTR_INVALID_SOCKET)
390 {
392 "connect_thread[service_id=%" PRId64 "]: socket() failed",
393 ctx->service.service_id);
394 return POTR_INVALID_SOCKET;
395 }
396
397#ifndef _WIN32
398 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
399#else /* _WIN32 */
400 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
401 (const char *)&reuse, sizeof(reuse));
402#endif /* _WIN32 */
403
404 if (ctx->service.src_addr[path_idx][0] != '\0' || ctx->service.src_port != 0)
405 {
406 struct sockaddr_in bind_addr;
407 memset(&bind_addr, 0, sizeof(bind_addr));
408 bind_addr.sin_family = AF_INET;
409 if (ctx->service.src_addr[path_idx][0] != '\0')
410 {
411 bind_addr.sin_addr = ctx->src_addr_resolved[path_idx];
412 }
413 else
414 {
415 bind_addr.sin_addr.s_addr = htonl(INADDR_ANY);
416 }
417 bind_addr.sin_port = htons(ctx->service.src_port); /* 0 = エフェメラル */
418
419#ifndef _WIN32
420 if (bind(sock, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0)
421#else /* _WIN32 */
422 if (bind(sock, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) == SOCKET_ERROR)
423#endif /* _WIN32 */
424 {
426 "connect_thread[service_id=%" PRId64 "]: bind() failed",
427 ctx->service.service_id);
428#ifndef _WIN32
429 close(sock);
430#else /* _WIN32 */
431 closesocket(sock);
432#endif /* _WIN32 */
433 return POTR_INVALID_SOCKET;
434 }
435 }
436
437 memset(&addr, 0, sizeof(addr));
438 addr.sin_family = AF_INET;
439 addr.sin_addr = ctx->dst_addr_resolved[path_idx];
440 addr.sin_port = htons(ctx->service.dst_port);
441
442 if (timeout_ms == 0U)
443 {
444 /* ブロッキング接続 */
445 if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0)
446 {
448 "connect_thread[service_id=%" PRId64 "]: connect() failed (blocking)",
449 ctx->service.service_id);
450#ifndef _WIN32
451 close(sock);
452#else /* _WIN32 */
453 closesocket(sock);
454#endif /* _WIN32 */
455 return POTR_INVALID_SOCKET;
456 }
457 return sock;
458 }
459
460 /* タイムアウト付き接続: ノンブロッキングモードで select を使う。
461 停止シグナルに素早く応答するため 100ms 単位でポーリングする。 */
462#ifndef _WIN32
463 {
464 int flags;
465 int ret;
466 int error = 0;
467 socklen_t errlen;
468
469 flags = fcntl(sock, F_GETFL, 0);
470 fcntl(sock, F_SETFL, flags | O_NONBLOCK);
471
472 ret = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
473 if (ret == 0)
474 {
475 /* 即座に接続成功 */
476 fcntl(sock, F_SETFL, flags);
477 return sock;
478 }
479 if (errno != EINPROGRESS)
480 {
482 "connect_thread[service_id=%" PRId64 "]: connect() failed (errno=%d)",
483 ctx->service.service_id, errno);
484 close(sock);
485 return POTR_INVALID_SOCKET;
486 }
487
488 /* select ループ (100ms ポーリング) */
489 {
490 uint32_t elapsed_ms = 0U;
491 int ready = 0;
492
493 while (elapsed_ms < timeout_ms && ctx->connect_thread_running[path_idx])
494 {
495 fd_set writefds;
496 struct timeval tv;
497 uint32_t poll_ms;
498
499 poll_ms = timeout_ms - elapsed_ms;
500 if (poll_ms > 100U) poll_ms = 100U;
501
502 FD_ZERO(&writefds);
503 FD_SET(sock, &writefds);
504 tv.tv_sec = (long)(poll_ms / 1000U);
505 tv.tv_usec = (long)((poll_ms % 1000U) * 1000L);
506
507 ret = select(sock + 1, NULL, &writefds, NULL, &tv);
508 if (ret < 0)
509 {
510 if (errno == EINTR) { elapsed_ms += poll_ms; continue; }
511 break;
512 }
513 if (ret > 0)
514 {
515 ready = 1;
516 break;
517 }
518 elapsed_ms += poll_ms;
519 }
520
521 if (!ready)
522 {
524 "connect_thread[service_id=%" PRId64 "]: connect() timed out",
525 ctx->service.service_id);
526 close(sock);
527 return POTR_INVALID_SOCKET;
528 }
529 }
530
531 errlen = (socklen_t)sizeof(error);
532 getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &errlen);
533 if (error != 0)
534 {
536 "connect_thread[service_id=%" PRId64 "]: connect() SO_ERROR=%d",
537 ctx->service.service_id, error);
538 close(sock);
539 return POTR_INVALID_SOCKET;
540 }
541
542 /* ブロッキングモードに戻す */
543 fcntl(sock, F_SETFL, flags);
544 return sock;
545 }
546#else /* _WIN32 */
547 {
548 u_long mode = 1;
549 int ret;
550 int error = 0;
551 int errlen;
552
553 ioctlsocket(sock, FIONBIO, &mode);
554
555 ret = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
556 if (ret == 0)
557 {
558 /* 即座に接続成功 */
559 mode = 0;
560 ioctlsocket(sock, FIONBIO, &mode);
561 return sock;
562 }
563 if (WSAGetLastError() != WSAEWOULDBLOCK)
564 {
566 "connect_thread[service_id=%" PRId64 "]: connect() failed (WSA error)",
567 ctx->service.service_id);
568 closesocket(sock);
569 return POTR_INVALID_SOCKET;
570 }
571
572 /* select ループ (100ms ポーリング) */
573 {
574 uint32_t elapsed_ms = 0U;
575 int ready = 0;
576
577 while (elapsed_ms < timeout_ms && ctx->connect_thread_running[path_idx])
578 {
579 fd_set writefds;
580 struct timeval tv;
581 uint32_t poll_ms;
582
583 poll_ms = timeout_ms - elapsed_ms;
584 if (poll_ms > 100U) poll_ms = 100U;
585
586 FD_ZERO(&writefds);
587 FD_SET(sock, &writefds);
588 tv.tv_sec = (long)(poll_ms / 1000U);
589 tv.tv_usec = (long)((poll_ms % 1000U) * 1000L);
590
591 ret = select(0, NULL, &writefds, NULL, &tv);
592 if (ret < 0)
593 {
594 break;
595 }
596 if (ret > 0 && FD_ISSET(sock, &writefds))
597 {
598 ready = 1;
599 break;
600 }
601 elapsed_ms += poll_ms;
602 }
603
604 if (!ready)
605 {
607 "connect_thread[service_id=%" PRId64 "]: connect() timed out",
608 ctx->service.service_id);
609 closesocket(sock);
610 return POTR_INVALID_SOCKET;
611 }
612 }
613
614 errlen = (int)sizeof(error);
615 getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&error, &errlen);
616 if (error != 0)
617 {
619 "connect_thread[service_id=%" PRId64 "]: connect() SO_ERROR=%d",
620 ctx->service.service_id, error);
621 closesocket(sock);
622 return POTR_INVALID_SOCKET;
623 }
624
625 /* ブロッキングモードに戻す */
626 mode = 0;
627 ioctlsocket(sock, FIONBIO, &mode);
628 return sock;
629 }
630#endif /* _WIN32 */
631}
632
633/* SENDER 用接続ループ (path ごと) */
634static void sender_connect_loop(struct PotrContext_ *ctx, int path_idx)
635{
636 int is_reconnect = 0; /* 初回接続フラグ */
637
638 while (ctx->connect_thread_running[path_idx])
639 {
640 PotrSocket sock;
641 int active_count;
642
644 "connect_thread[service_id=%" PRId64 " path=%d]: connecting to %s:%u ...",
645 ctx->service.service_id, path_idx,
646 ctx->service.dst_addr[path_idx],
647 (unsigned)ctx->service.dst_port);
648
649 sock = tcp_connect_with_timeout(ctx, path_idx);
650
651 if (sock == POTR_INVALID_SOCKET)
652 {
653 if (!ctx->connect_thread_running[path_idx]) break;
654
655 if (ctx->service.reconnect_interval_ms == 0U)
656 {
658 "connect_thread[service_id=%" PRId64 " path=%d]: connect failed, "
659 "no reconnect (reconnect_interval_ms=0)",
660 ctx->service.service_id, path_idx);
661 break;
662 }
663
665 "connect_thread[service_id=%" PRId64 " path=%d]: connect failed, "
666 "retrying in %ums",
667 ctx->service.service_id, path_idx,
668 (unsigned)ctx->service.reconnect_interval_ms);
669 reconnect_wait(ctx, path_idx, ctx->service.reconnect_interval_ms);
670 continue;
671 }
672
674 "connect_thread[service_id=%" PRId64 " path=%d]: TCP connected",
675 ctx->service.service_id, path_idx);
676
677 ctx->tcp_conn_fd[path_idx] = sock;
678 ctx->tcp_last_ping_recv_ms[path_idx] = connect_get_ms();
679 ctx->tcp_last_ping_req_recv_ms[path_idx] = connect_get_ms();
680
681 /* tcp_active_paths カウンタをインクリメント (tcp_state_mutex 保護) */
682#ifndef _WIN32
683 pthread_mutex_lock(&ctx->tcp_state_mutex);
684 active_count = ++ctx->tcp_active_paths;
685 pthread_mutex_unlock(&ctx->tcp_state_mutex);
686#else /* _WIN32 */
687 EnterCriticalSection(&ctx->tcp_state_mutex);
688 active_count = ++ctx->tcp_active_paths;
689 LeaveCriticalSection(&ctx->tcp_state_mutex);
690#endif /* _WIN32 */
691 (void)active_count; /* CONNECTED イベントは recv スレッドが最初のパケット受信時に発火 */
692
694
695 /* 再接続時 (path[0] のみ): 全 path 切断後の再起動ではキューをリセット */
696 if (is_reconnect && path_idx == 0)
697 {
698 reset_send_queue(ctx);
699 }
700
701 if (start_connected_threads(ctx, path_idx) != POTR_SUCCESS)
702 {
703 /* スレッド起動失敗: カウンタを戻す */
704#ifndef _WIN32
705 pthread_mutex_lock(&ctx->tcp_state_mutex);
706 active_count = --ctx->tcp_active_paths;
707 pthread_mutex_unlock(&ctx->tcp_state_mutex);
708#else /* _WIN32 */
709 EnterCriticalSection(&ctx->tcp_state_mutex);
710 active_count = --ctx->tcp_active_paths;
711 LeaveCriticalSection(&ctx->tcp_state_mutex);
712#endif /* _WIN32 */
713 if (active_count == 0)
714 {
716 }
717
718 if (!ctx->connect_thread_running[path_idx]) break;
719 if (ctx->service.reconnect_interval_ms == 0U) break;
720
721 reconnect_wait(ctx, path_idx, ctx->service.reconnect_interval_ms);
722 is_reconnect = 1;
723 continue;
724 }
725
726 /* recv スレッドが接続断を検知して自然終了するまで待機する */
727 join_recv_thread(ctx, path_idx);
728
730 "connect_thread[service_id=%" PRId64 " path=%d]: TCP disconnected",
731 ctx->service.service_id, path_idx);
732
733 stop_connected_threads(ctx, path_idx);
734
735 /* tcp_active_paths カウンタをデクリメント (tcp_state_mutex 保護) */
736#ifndef _WIN32
737 pthread_mutex_lock(&ctx->tcp_state_mutex);
738 active_count = --ctx->tcp_active_paths;
739 pthread_mutex_unlock(&ctx->tcp_state_mutex);
740#else /* _WIN32 */
741 EnterCriticalSection(&ctx->tcp_state_mutex);
742 active_count = --ctx->tcp_active_paths;
743 LeaveCriticalSection(&ctx->tcp_state_mutex);
744#endif /* _WIN32 */
745
746 if (active_count == 0)
747 {
748 /* 全 path 切断: send_window 通番と session をリセット */
750 }
751
752 if (!ctx->connect_thread_running[path_idx]) break;
753 if (ctx->service.reconnect_interval_ms == 0U)
754 {
756 "connect_thread[service_id=%" PRId64 " path=%d]: no reconnect "
757 "(reconnect_interval_ms=0)",
758 ctx->service.service_id, path_idx);
759 break;
760 }
761
763 "connect_thread[service_id=%" PRId64 " path=%d]: waiting %ums before reconnect",
764 ctx->service.service_id, path_idx,
765 (unsigned)ctx->service.reconnect_interval_ms);
766 reconnect_wait(ctx, path_idx, ctx->service.reconnect_interval_ms);
767 is_reconnect = 1;
768 }
769}
770
771/* RECEIVER 用 accept ループ (path ごと)
772 *
773 * [セッション層対称化]
774 * accept() 直後に最初の 1 パケットを先読みし session_id を取得する。
775 * session_establish_mutex 下で ctx の既知セッションと比較し、以下の 3 ケースを判別する。
776 * TCP_SESSION_NEW : 新セッション (初回 or SENDER 再起動)
777 * → 他 path の既存接続に切断シグナルを送ってから新規セッションを開始する。
778 * TCP_SESSION_SAME : 同一セッションの追加パス (マルチパス)
779 * → reset_connection_state() を呼ばずにパスを追加する。
780 * TCP_SESSION_OLD : 旧セッション (再送や遅延パケット等)
781 * → コネクションを閉じてループ先頭へ戻る。
782 * 先読みパケットは tcp_first_pkt_buf/len に保存し、recv スレッドが起動直後に処理する。 */
783static void receiver_accept_loop(struct PotrContext_ *ctx, int path_idx)
784{
785 int is_bidir = (ctx->service.type == POTR_TYPE_TCP_BIDIR);
786 int is_reconnect = 0;
787
788 /* 先読みタイムアウト: TCP ヘルスタイムアウトの 3 倍、未設定時は 30 秒 */
789 uint32_t first_pkt_timeout_ms = (ctx->global.tcp_health_timeout_ms > 0U)
790 ? ctx->global.tcp_health_timeout_ms * 3U
791 : 30000U;
792
793 while (ctx->connect_thread_running[path_idx])
794 {
795 PotrSocket conn;
796 struct sockaddr_in peer_addr;
797 socklen_t peer_len = (socklen_t)sizeof(peer_addr);
798 int active_count;
799 char peer_addr_str[INET_ADDRSTRLEN];
800 int session_result;
801
802 conn = accept(ctx->tcp_listen_sock[path_idx],
803 (struct sockaddr *)&peer_addr, &peer_len);
804
805 if (conn == POTR_INVALID_SOCKET)
806 {
807 if (!ctx->connect_thread_running[path_idx]) break;
808 /* 一時的なエラー: ループ継続 */
810 "connect_thread[service_id=%" PRId64 " path=%d]: accept() error, retrying",
811 ctx->service.service_id, path_idx);
812 continue;
813 }
814
815 inet_ntop(AF_INET, &peer_addr.sin_addr, peer_addr_str, sizeof(peer_addr_str));
816
817 /* 接続元フィルタ: src_addr[path_idx] / src_port が指定されていれば一致確認 */
818 {
819 int filtered = 0;
820 if (ctx->service.src_addr[path_idx][0] != '\0')
821 {
822 if (peer_addr.sin_addr.s_addr !=
823 ctx->src_addr_resolved[path_idx].s_addr)
824 {
825 filtered = 1;
826 }
827 }
828 if (!filtered && ctx->service.src_port != 0)
829 {
830 if (ntohs(peer_addr.sin_port) != ctx->service.src_port)
831 {
832 filtered = 1;
833 }
834 }
835 if (filtered)
836 {
838 "connect_thread[service_id=%" PRId64 " path=%d]: rejected connection"
839 " from %s:%u (src filter)",
840 ctx->service.service_id, path_idx,
841 peer_addr_str,
842 (unsigned)ntohs(peer_addr.sin_port));
843#ifndef _WIN32
844 close(conn);
845#else /* _WIN32 */
846 closesocket(conn);
847#endif /* _WIN32 */
848 continue;
849 }
850 }
851
853 "connect_thread[service_id=%" PRId64 " path=%d]: TCP accepted from %s:%u",
854 ctx->service.service_id, path_idx,
855 peer_addr_str,
856 (unsigned)ntohs(peer_addr.sin_port));
857
858 /* ── セッション判定: 最初の 1 パケットを先読みして session_id を取得する ── */
859 {
860 PotrPacket pkt;
861 size_t pkt_len = 0;
862 int r;
863
864 r = tcp_read_first_packet(conn,
865 ctx->tcp_first_pkt_buf[path_idx],
867 &pkt_len,
868 first_pkt_timeout_ms);
869 if (r <= 0)
870 {
871 /* タイムアウトまたは EOF/エラー */
873 "connect_thread[service_id=%" PRId64 " path=%d]: "
874 "first packet read failed (r=%d), closing",
875 ctx->service.service_id, path_idx, r);
876#ifndef _WIN32
877 close(conn);
878#else /* _WIN32 */
879 closesocket(conn);
880#endif /* _WIN32 */
881 continue;
882 }
883
884 if (packet_parse(&pkt, ctx->tcp_first_pkt_buf[path_idx], pkt_len)
885 != POTR_SUCCESS)
886 {
888 "connect_thread[service_id=%" PRId64 " path=%d]: "
889 "first packet parse failed, closing",
890 ctx->service.service_id, path_idx);
891#ifndef _WIN32
892 close(conn);
893#else /* _WIN32 */
894 closesocket(conn);
895#endif /* _WIN32 */
896 continue;
897 }
898
899 /* session_establish_mutex 下でセッション判定と状態更新を行う */
900#ifndef _WIN32
901 pthread_mutex_lock(&ctx->session_establish_mutex);
902#else /* _WIN32 */
903 EnterCriticalSection(&ctx->session_establish_mutex);
904#endif /* _WIN32 */
905
906 session_result = tcp_session_compare(ctx, &pkt);
907
908 if (session_result == TCP_SESSION_OLD)
909 {
910 /* 旧セッション: 拒否 */
911#ifndef _WIN32
912 pthread_mutex_unlock(&ctx->session_establish_mutex);
913#else /* _WIN32 */
914 LeaveCriticalSection(&ctx->session_establish_mutex);
915#endif /* _WIN32 */
917 "connect_thread[service_id=%" PRId64 " path=%d]: "
918 "old session rejected (known_id=%u pkt_id=%u)",
919 ctx->service.service_id, path_idx,
920 ctx->peer_session_id, pkt.session_id);
921#ifndef _WIN32
922 close(conn);
923#else /* _WIN32 */
924 closesocket(conn);
925#endif /* _WIN32 */
926 continue;
927 }
928
929 if (session_result == TCP_SESSION_NEW)
930 {
931 /* 新セッション: 他 path の既存接続に切断シグナルを送る。
932 * cleanup は各 path の accept スレッドが自然に行う。 */
933 int k;
934 for (k = 0; k < ctx->n_path; k++)
935 {
936 if (k == path_idx) continue;
937 if (ctx->tcp_conn_fd[k] != POTR_INVALID_SOCKET)
938 {
939 ctx->running[k] = 0;
940 close_tcp_conn(ctx, k); /* recv ブロックを解除 */
941 }
942 }
943 reset_connection_state(ctx); /* peer_session_known = 0, frag_buf_len = 0 */
944 }
945 /* TCP_SESSION_SAME の場合は reset 不要 (セッション継続) */
946
947 ctx->tcp_conn_fd[path_idx] = conn;
948 ctx->tcp_last_ping_recv_ms[path_idx] = connect_get_ms();
949 ctx->tcp_last_ping_req_recv_ms[path_idx] = connect_get_ms();
950 ctx->tcp_first_pkt_len[path_idx] = pkt_len; /* 先読みバッファ有効化 */
951
952#ifndef _WIN32
953 pthread_mutex_unlock(&ctx->session_establish_mutex);
954#else /* _WIN32 */
955 LeaveCriticalSection(&ctx->session_establish_mutex);
956#endif /* _WIN32 */
957 }
958 /* ── セッション判定ここまで ── */
959
960 /* tcp_active_paths カウンタをインクリメント (tcp_state_mutex 保護) */
961#ifndef _WIN32
962 pthread_mutex_lock(&ctx->tcp_state_mutex);
963 active_count = ++ctx->tcp_active_paths;
964 pthread_mutex_unlock(&ctx->tcp_state_mutex);
965#else /* _WIN32 */
966 EnterCriticalSection(&ctx->tcp_state_mutex);
967 active_count = ++ctx->tcp_active_paths;
968 LeaveCriticalSection(&ctx->tcp_state_mutex);
969#endif /* _WIN32 */
970 (void)active_count; /* CONNECTED イベントは recv スレッドが最初のパケット受信時に発火 */
971
972 /* TCP_BIDIR 新セッション再接続時 (path[0] のみ): shutdown 済みのキューをリセット */
973 if (is_bidir && session_result == TCP_SESSION_NEW && is_reconnect && path_idx == 0)
974 {
975 reset_send_queue(ctx);
976 }
977
978 if (start_connected_threads(ctx, path_idx) != POTR_SUCCESS)
979 {
980#ifndef _WIN32
981 pthread_mutex_lock(&ctx->tcp_state_mutex);
982 active_count = --ctx->tcp_active_paths;
983 pthread_mutex_unlock(&ctx->tcp_state_mutex);
984#else /* _WIN32 */
985 EnterCriticalSection(&ctx->tcp_state_mutex);
986 active_count = --ctx->tcp_active_paths;
987 LeaveCriticalSection(&ctx->tcp_state_mutex);
988#endif /* _WIN32 */
989 if (active_count == 0)
990 {
992 }
993 ctx->tcp_first_pkt_len[path_idx] = 0; /* 先読みバッファを無効化 */
994 close_tcp_conn(ctx, path_idx);
995 is_reconnect = 1;
996 continue;
997 }
998
999 /* recv スレッドが接続断を検知して自然終了するまで待機する */
1000 join_recv_thread(ctx, path_idx);
1001
1003 "connect_thread[service_id=%" PRId64 " path=%d]: TCP connection closed",
1004 ctx->service.service_id, path_idx);
1005
1006 stop_connected_threads(ctx, path_idx);
1007
1008 /* 先読みバッファをクリア (recv スレッドが未処理のまま終了した場合の安全策) */
1009 ctx->tcp_first_pkt_len[path_idx] = 0;
1010
1011 /* tcp_active_paths カウンタをデクリメント (tcp_state_mutex 保護) */
1012#ifndef _WIN32
1013 pthread_mutex_lock(&ctx->tcp_state_mutex);
1014 active_count = --ctx->tcp_active_paths;
1015 pthread_mutex_unlock(&ctx->tcp_state_mutex);
1016#else /* _WIN32 */
1017 EnterCriticalSection(&ctx->tcp_state_mutex);
1018 active_count = --ctx->tcp_active_paths;
1019 LeaveCriticalSection(&ctx->tcp_state_mutex);
1020#endif /* _WIN32 */
1021
1022 if (active_count == 0)
1023 {
1025 }
1026
1027 is_reconnect = 1;
1028 /* ループ継続: 次の accept へ */
1029 }
1030}
1031
1032/* 接続管理スレッド本体 (ConnectArg* を受け取り、path ごとに動作) */
1033#ifndef _WIN32
1034static void *connect_thread_func(void *arg)
1035#else /* _WIN32 */
1036static DWORD WINAPI connect_thread_func(LPVOID arg)
1037#endif /* _WIN32 */
1038{
1039 ConnectArg *carg = (ConnectArg *)arg;
1040 struct PotrContext_ *ctx = carg->ctx;
1041 int path_idx = carg->path_idx;
1042
1044 "connect_thread[service_id=%" PRId64 " path=%d]: started (role=%s type=%s)",
1045 ctx->service.service_id, path_idx,
1046 (ctx->role == POTR_ROLE_SENDER) ? "SENDER" : "RECEIVER",
1047 (ctx->service.type == POTR_TYPE_TCP_BIDIR) ? "TCP_BIDIR" : "TCP");
1048
1049 if (ctx->role == POTR_ROLE_SENDER)
1050 {
1051 sender_connect_loop(ctx, path_idx);
1052 }
1053 else
1054 {
1055 receiver_accept_loop(ctx, path_idx);
1056 }
1057
1058 ctx->connect_thread_running[path_idx] = 0;
1059
1061 "connect_thread[service_id=%" PRId64 " path=%d]: exited",
1062 ctx->service.service_id, path_idx);
1063
1064#ifndef _WIN32
1065 return NULL;
1066#else /* _WIN32 */
1067 return 0;
1068#endif /* _WIN32 */
1069}
1070
1085{
1086 int i;
1087
1088 if (ctx == NULL)
1089 {
1090 return POTR_ERROR;
1091 }
1092
1094 "connect_thread[service_id=%" PRId64 "]: starting %d path(s)",
1095 ctx->service.service_id, ctx->n_path);
1096
1097 /* RECEIVER: session_establish_mutex と先読みバッファを初期化する */
1098 if (ctx->role == POTR_ROLE_RECEIVER)
1099 {
1100#ifndef _WIN32
1101 pthread_mutex_init(&ctx->session_establish_mutex, NULL);
1102#else /* _WIN32 */
1103 InitializeCriticalSection(&ctx->session_establish_mutex);
1104#endif /* _WIN32 */
1105
1106 for (i = 0; i < ctx->n_path; i++)
1107 {
1108 ctx->tcp_first_pkt_len[i] = 0;
1109 ctx->tcp_first_pkt_buf[i] = (uint8_t *)malloc(
1111 if (ctx->tcp_first_pkt_buf[i] == NULL)
1112 {
1113 int j;
1115 "connect_thread[service_id=%" PRId64 "]: "
1116 "tcp_first_pkt_buf[%d] malloc failed",
1117 ctx->service.service_id, i);
1118 /* 確保済み分を解放 */
1119 for (j = 0; j < i; j++)
1120 {
1121 free(ctx->tcp_first_pkt_buf[j]);
1122 ctx->tcp_first_pkt_buf[j] = NULL;
1123 }
1124#ifndef _WIN32
1125 pthread_mutex_destroy(&ctx->session_establish_mutex);
1126#else /* _WIN32 */
1127 DeleteCriticalSection(&ctx->session_establish_mutex);
1128#endif /* _WIN32 */
1129 return POTR_ERROR;
1130 }
1131 }
1132 }
1133
1134 for (i = 0; i < ctx->n_path; i++)
1135 {
1136 ctx->connect_thread_running[i] = 1;
1137 s_connect_args[i].ctx = ctx;
1138 s_connect_args[i].path_idx = i;
1139
1140#ifndef _WIN32
1141 if (pthread_create(&ctx->connect_thread[i], NULL,
1143 {
1144 ctx->connect_thread_running[i] = 0;
1146 "connect_thread[service_id=%" PRId64 " path=%d]: pthread_create failed",
1147 ctx->service.service_id, i);
1148 return POTR_ERROR;
1149 }
1150#else /* _WIN32 */
1151 ctx->connect_thread[i] = CreateThread(NULL, 0, connect_thread_func,
1152 &s_connect_args[i], 0, NULL);
1153 if (ctx->connect_thread[i] == NULL)
1154 {
1155 ctx->connect_thread_running[i] = 0;
1157 "connect_thread[service_id=%" PRId64 " path=%d]: CreateThread failed",
1158 ctx->service.service_id, i);
1159 /* 起動済み path のスレッドは potr_connect_thread_stop で停止する */
1160 return POTR_ERROR;
1161 }
1162#endif /* _WIN32 */
1163 }
1164
1165 return POTR_SUCCESS;
1166}
1167
1184{
1185 int i;
1186 int any_running = 0;
1187
1188 if (ctx == NULL) { return; }
1189
1190 for (i = 0; i < ctx->n_path; i++)
1191 {
1192 if (ctx->connect_thread_running[i]) { any_running = 1; break; }
1193 }
1194 if (!any_running) { return; }
1195
1196 /* 1. 全 path の停止フラグをクリア */
1197 for (i = 0; i < ctx->n_path; i++)
1198 {
1199 ctx->connect_thread_running[i] = 0;
1200 }
1201
1202 /* 2. reconnect_wait 中の全スレッドを起床させる */
1203#ifndef _WIN32
1204 pthread_mutex_lock(&ctx->tcp_state_mutex);
1205 pthread_cond_broadcast(&ctx->tcp_state_cv);
1206 pthread_mutex_unlock(&ctx->tcp_state_mutex);
1207#else /* _WIN32 */
1208 EnterCriticalSection(&ctx->tcp_state_mutex);
1209 WakeAllConditionVariable(&ctx->tcp_state_cv);
1210 LeaveCriticalSection(&ctx->tcp_state_mutex);
1211#endif /* _WIN32 */
1212
1213 /* 3. RECEIVER: 全 path の listen ソケットをクローズして accept をアンブロック */
1214 if (ctx->role == POTR_ROLE_RECEIVER)
1215 {
1216 for (i = 0; i < ctx->n_path; i++)
1217 {
1218 if (ctx->tcp_listen_sock[i] == POTR_INVALID_SOCKET) continue;
1219#ifndef _WIN32
1220 shutdown(ctx->tcp_listen_sock[i], SHUT_RDWR);
1221 close(ctx->tcp_listen_sock[i]);
1222#else /* _WIN32 */
1223 closesocket(ctx->tcp_listen_sock[i]);
1224#endif /* _WIN32 */
1226 }
1227 }
1228
1229 /* 4. 全 path の接続ソケットをクローズして recv ループをアンブロック */
1230 for (i = 0; i < ctx->n_path; i++)
1231 {
1232 if (ctx->tcp_conn_fd[i] == POTR_INVALID_SOCKET) continue;
1233#ifndef _WIN32
1234 shutdown(ctx->tcp_conn_fd[i], SHUT_RDWR);
1235 close(ctx->tcp_conn_fd[i]);
1236#else /* _WIN32 */
1237 shutdown(ctx->tcp_conn_fd[i], SD_BOTH);
1238 closesocket(ctx->tcp_conn_fd[i]);
1239#endif /* _WIN32 */
1241 }
1242
1243 /* 5. 全 connect スレッドの終了を待機する */
1244#ifndef _WIN32
1245 for (i = 0; i < ctx->n_path; i++)
1246 {
1247 pthread_join(ctx->connect_thread[i], NULL);
1248 }
1249#else /* _WIN32 */
1250 for (i = 0; i < ctx->n_path; i++)
1251 {
1252 if (ctx->connect_thread[i] == NULL) continue;
1253 WaitForSingleObject(ctx->connect_thread[i], INFINITE);
1254 CloseHandle(ctx->connect_thread[i]);
1255 ctx->connect_thread[i] = NULL;
1256 }
1257#endif /* _WIN32 */
1258
1259 /* 6. 送信スレッドを停止する (全 path join 後) */
1261
1262 /* 7. RECEIVER: session_establish_mutex と先読みバッファを破棄する */
1263 if (ctx->role == POTR_ROLE_RECEIVER)
1264 {
1265 for (i = 0; i < ctx->n_path; i++)
1266 {
1267 ctx->tcp_first_pkt_len[i] = 0;
1268 if (ctx->tcp_first_pkt_buf[i] != NULL)
1269 {
1270 free(ctx->tcp_first_pkt_buf[i]);
1271 ctx->tcp_first_pkt_buf[i] = NULL;
1272 }
1273 }
1274#ifndef _WIN32
1275 pthread_mutex_destroy(&ctx->session_establish_mutex);
1276#else /* _WIN32 */
1277 DeleteCriticalSection(&ctx->session_establish_mutex);
1278#endif /* _WIN32 */
1279 }
1280
1282 "connect_thread[service_id=%" PRId64 "]: all paths stopped",
1283 ctx->service.service_id);
1284}
#define POTR_MAX_PATH
マルチパスの最大パス数。
#define POTR_PEER_NA
ピア ID 未割当を示す予約値。 1:1 モードのコールバックで渡される (ピアの概念がない)。 potrSend() に N:1 モードで指定した場合はエラーを返す。
#define POTR_SUCCESS
成功の戻り値を表す定数。
#define POTR_ERROR
失敗の戻り値を表す定数。
int packet_parse(PotrPacket *packet, const void *buf, size_t buf_len)
受信バイト列をパケット構造体に解析します。
Definition packet.c:299
パケット構築・解析モジュールの内部ヘッダー。
#define PACKET_HEADER_SIZE
パケットヘッダーの固定長 (バイト)。payload フィールドの開始オフセット。
Definition packet.h:23
通信ライブラリの定数ファイル。
@ POTR_TRACE_ERROR
エラー。操作の失敗を記録。TRACE_LV_ERROR (1) と同値。
@ POTR_TRACE_INFO
情報。TRACE_LV_INFO (3) と同値。
@ POTR_TRACE_VERBOSE
詳細情報 (デバッグ)。TRACE_LV_VERBOSE (4) と同値。
@ POTR_TRACE_WARNING
警告。回復可能な異常を記録。TRACE_LV_WARNING (2) と同値。
@ POTR_TYPE_TCP_BIDIR
TCP 双方向通信 (両端が potrSend 可)。
@ POTR_ROLE_SENDER
送信者。
@ POTR_ROLE_RECEIVER
受信者。
@ POTR_EVENT_DISCONNECTED
切断を検知 (タイムアウト / FIN 受信 / REJECT 受信)。data=NULL, len=0。
static PotrSocket tcp_connect_with_timeout(struct PotrContext_ *ctx, int path_idx)
void potr_connect_thread_stop(struct PotrContext_ *ctx)
TCP 接続管理スレッドを停止します。
static void * connect_thread_func(void *arg)
static void reset_send_queue(struct PotrContext_ *ctx)
static void join_recv_thread(struct PotrContext_ *ctx, int path_idx)
static void close_tcp_conn(struct PotrContext_ *ctx, int path_idx)
static void receiver_accept_loop(struct PotrContext_ *ctx, int path_idx)
static void reconnect_wait(struct PotrContext_ *ctx, int path_idx, uint32_t wait_ms)
static void reset_all_paths_disconnected(struct PotrContext_ *ctx)
static int tcp_read_first_packet(PotrSocket fd, uint8_t *buf, size_t max_buf, size_t *out_len, uint32_t timeout_ms)
#define TCP_SESSION_OLD
static uint64_t connect_get_ms(void)
static void reset_connection_state(struct PotrContext_ *ctx)
static int start_connected_threads(struct PotrContext_ *ctx, int path_idx)
static int accept_tcp_wait_readable(PotrSocket fd, uint32_t wait_ms)
static ConnectArg s_connect_args[POTR_MAX_PATH]
static void stop_connected_threads(struct PotrContext_ *ctx, int path_idx)
#define TCP_SESSION_NEW
static void sender_connect_loop(struct PotrContext_ *ctx, int path_idx)
static int accept_tcp_read_all(PotrSocket fd, uint8_t *buf, size_t n)
int potr_connect_thread_start(struct PotrContext_ *ctx)
TCP 接続管理スレッドを起動します (path 数分)。
static int tcp_session_compare(const struct PotrContext_ *ctx, const PotrPacket *pkt)
#define TCP_SESSION_SAME
TCP 接続管理スレッドの内部ヘッダー。
セッションコンテキスト内部定義ヘッダー。
#define POTR_INVALID_SOCKET
Definition potrContext.h:36
int PotrSocket
Definition potrContext.h:32
int potr_tcp_health_thread_stop(struct PotrContext_ *ctx, int path_idx)
TCP ヘルスチェックスレッドを停止します。
int potr_tcp_health_thread_start(struct PotrContext_ *ctx, int path_idx)
TCP ヘルスチェックスレッドを path ごとに起動します。
ヘルスチェックスレッド内部ヘッダー。
porter 内部ログマクロ定義ヘッダー。
#define POTR_LOG(level,...)
porter 内部ログ出力マクロ。
Definition potrLog.h:68
int tcp_recv_thread_start(struct PotrContext_ *ctx, int path_idx)
TCP 受信スレッドを path ごとに起動します。
受信スレッド内部ヘッダー。
int potr_send_queue_init(PotrSendQueue *q, size_t depth, uint16_t max_payload)
void potr_send_queue_destroy(PotrSendQueue *q)
非同期送信キューの型定義と操作関数。
int potr_send_thread_start(struct PotrContext_ *ctx)
void potr_send_thread_stop(struct PotrContext_ *ctx)
非同期送信スレッドの内部ヘッダー。
struct PotrContext_ * ctx
セッションコンテキスト構造体。PotrHandle の実体。
volatile int tcp_active_paths
アクティブ TCP path 数 (0 = 全切断)。
PotrRecvCallback callback
受信コールバック。
volatile uint64_t tcp_last_ping_req_recv_ms[POTR_MAX_PATH]
TCP PING 要求最終受信時刻 (ms, CLOCK_MONOTONIC 基準)。path ごと。接続確立時に現在時刻で初期化。
PotrGlobalConfig global
グローバル設定。
volatile uint64_t tcp_last_ping_recv_ms[POTR_MAX_PATH]
TCP PING 応答最終受信時刻 (ms, CLOCK_MONOTONIC 基準)。path ごと。接続確立時に現在時刻で初期化。
struct in_addr dst_addr_resolved[POTR_MAX_PATH]
解決済み宛先 IPv4 アドレス (unicast のみ)。
int64_t peer_session_tv_sec
追跡中の相手セッション開始時刻 秒部。
int32_t peer_session_tv_nsec
追跡中の相手セッション開始時刻 ナノ秒部。
struct in_addr src_addr_resolved[POTR_MAX_PATH]
解決済み送信元 IPv4 アドレス。
PotrRole role
役割 (POTR_ROLE_SENDER / POTR_ROLE_RECEIVER)。
PotrSendQueue send_queue
非同期送信キュー。
volatile int running[POTR_MAX_PATH]
受信スレッド実行フラグ (1: 実行中, 0: 停止)。path ごと。
volatile int health_alive
疎通状態 (1: alive, 0: dead/未接続)。UDP 用。受信者が管理。
size_t frag_buf_len
フラグメント結合バッファの現在のデータ長 (バイト)。
PotrThread recv_thread[POTR_MAX_PATH]
受信スレッドハンドル (path ごと)。
uint32_t peer_session_id
追跡中の相手セッション識別子。
size_t tcp_first_pkt_len[POTR_MAX_PATH]
先読みパケットのバイト数 (0: 先読みなし)。
PotrMutex tcp_state_mutex
tcp_state_cv 保護用ミューテックス。tcp_active_paths のカウンタ更新も保護。
PotrMutex session_establish_mutex
PotrSocket tcp_listen_sock[POTR_MAX_PATH]
RECEIVER: listen ソケット (path ごと)。
volatile int connect_thread_running[POTR_MAX_PATH]
connect スレッド実行フラグ (1: 実行中, 0: 停止)。path ごと。
PotrServiceDef service
サービス定義。
uint8_t * tcp_first_pkt_buf[POTR_MAX_PATH]
先読みパケットバッファ (動的確保、PACKET_HEADER_SIZE + max_payload バイト)。
int peer_session_known
相手セッションが初期化済みか (0: 未初期化)。
PotrThread connect_thread[POTR_MAX_PATH]
SENDER: connect スレッド。RECEIVER: accept スレッド。path ごと。
PotrWindow send_window
送信バッファ (過去 N パケット保持。NACK 再送・REJECT 判定に使用)。
volatile int send_thread_running
送信スレッド実行フラグ (1: 実行中, 0: 停止)。
PotrCondVar tcp_state_cv
切断通知・reconnect sleep の中断用条件変数。
int n_path
有効パス数。
PotrSocket tcp_conn_fd[POTR_MAX_PATH]
アクティブ TCP 接続 fd (path ごと)。
uint32_t tcp_health_timeout_ms
TCP 通信種別の PING 応答待機タイムアウト (ミリ秒)。SENDER 側で使用。0 = 無効。設定ファイルキー: tcp_health_timeout_ms。
uint16_t max_payload
最大ペイロード長 (バイト)。
ネットワーク送受信用パケット構造体。
int64_t session_tv_sec
セッション開始時刻 秒部 (NBO)。struct timespec の tv_sec 相当。
uint32_t session_id
セッション識別子 (NBO)。potrOpenService 時に決定する乱数。
int32_t session_tv_nsec
セッション開始時刻 ナノ秒部 (NBO)。struct timespec の tv_nsec 相当。
size_t depth
キュー容量 (エントリ数)。
uint16_t src_port
送信者の送信元 bind ポート番号。0 = OS 自動選定。(全通信種別で省略可)
uint16_t dst_port
宛先ポート番号。サービスの識別子。受信者の bind ポート / 送信者の送信先ポート。(全通信種別で必須)
char src_addr[POTR_MAX_PATH][POTR_MAX_ADDR_LEN]
送信元アドレス [0]=src_addr1 〜 [3]=src_addr4。送信者は bind / 送信インターフェース、受信者は送信元フィルタ。(全通信種別で必須)
PotrType type
通信種別。
int64_t service_id
サービス ID。
uint32_t connect_timeout_ms
SENDER TCP 接続タイムアウト (ms)。0 = OS デフォルト。デフォルト: POTR_DEFAULT_CONNECT_TIMEOUT_MS。
char dst_addr[POTR_MAX_PATH][POTR_MAX_ADDR_LEN]
宛先アドレス [0]=dst_addr1 〜 [3]=dst_addr4。送信者は送信先、受信者は bind アドレス。(unicast のみ)
uint32_t reconnect_interval_ms
SENDER 自動再接続間隔 (ms)。0 = 再接続なし。デフォルト: POTR_DEFAULT_RECONNECT_INTERVAL_MS。
uint32_t base_seq
ウィンドウ先頭の通番。
Definition window.h:34
uint32_t next_seq
送信側: 次に割り当てる通番。受信側: 次に期待する通番。
Definition window.h:35
uint8_t * valid
バッファ有効フラグ配列 (動的確保。window_size バイト)。
Definition window.h:32
uint16_t window_size
ウィンドウサイズ (パケット数)。
Definition window.h:36
スライディングウィンドウ管理モジュールの内部ヘッダー。