Document of c-modernization-kit (porter) 1.0.0
Loading...
Searching...
No Matches
potrHealthThread.c
Go to the documentation of this file.
1
18
19#include <string.h>
20#include <inttypes.h>
21
22#ifndef _WIN32
23 #include <sys/socket.h>
24 #include <netinet/in.h>
25 #include <arpa/inet.h>
26 #include <unistd.h>
27 #include <time.h>
28#else /* _WIN32 */
29 #include <winsock2.h>
30 #include <ws2tcpip.h>
31#endif /* _WIN32 */
32
33#include <porter_const.h>
34#include <porter.h>
35
36#include "../protocol/packet.h"
37#include "../protocol/window.h"
38#include "../potrContext.h"
39#include "../potrPeerTable.h"
40#include "potrHealthThread.h"
41#include "../infra/potrLog.h"
43
44#ifndef _WIN32
45 typedef pthread_mutex_t PotrMutexLocal;
46 #define POTR_MUTEX_LOCK_LOCAL(m) pthread_mutex_lock(m)
47 #define POTR_MUTEX_UNLOCK_LOCAL(m) pthread_mutex_unlock(m)
48#else /* _WIN32 */
49 typedef CRITICAL_SECTION PotrMutexLocal;
50 #define POTR_MUTEX_LOCK_LOCAL(m) EnterCriticalSection(m)
51 #define POTR_MUTEX_UNLOCK_LOCAL(m) LeaveCriticalSection(m)
52#endif /* _WIN32 */
53
54/* TCP health スレッドに渡す引数 (path ごと) */
55typedef struct
56{
59 int _pad;
60} HealthArg;
61
63
64/* health_interval_ms ミリ秒、または停止シグナルが来るまでスリープする (path_idx 版) */
65static void health_sleep(struct PotrContext_ *ctx, int path_idx, uint32_t interval_ms)
66{
67#ifndef _WIN32
68 struct timespec abs_ts;
69 clock_gettime(CLOCK_REALTIME, &abs_ts);
70 abs_ts.tv_sec += (time_t)(interval_ms / 1000U);
71 abs_ts.tv_nsec += (long)((interval_ms % 1000U) * 1000000UL);
72 if (abs_ts.tv_nsec >= 1000000000L)
73 {
74 abs_ts.tv_sec++;
75 abs_ts.tv_nsec -= 1000000000L;
76 }
77 pthread_mutex_lock(&ctx->health_mutex[path_idx]);
78 if (ctx->health_running[path_idx])
79 {
80 pthread_cond_timedwait(&ctx->health_wakeup[path_idx],
81 &ctx->health_mutex[path_idx], &abs_ts);
82 }
83 pthread_mutex_unlock(&ctx->health_mutex[path_idx]);
84#else /* _WIN32 */
85 EnterCriticalSection(&ctx->health_mutex[path_idx]);
86 if (ctx->health_running[path_idx])
87 {
88 SleepConditionVariableCS(&ctx->health_wakeup[path_idx],
89 &ctx->health_mutex[path_idx], (DWORD)interval_ms);
90 }
91 LeaveCriticalSection(&ctx->health_mutex[path_idx]);
92#endif /* _WIN32 */
93}
94
95/* 現在時刻をミリ秒単位で返す (単調増加クロック) */
96static uint64_t health_get_ms(void)
97{
98#ifndef _WIN32
99 struct timespec ts;
100 clock_gettime(CLOCK_MONOTONIC, &ts);
101 return (uint64_t)ts.tv_sec * 1000ULL + (uint64_t)ts.tv_nsec / 1000000ULL;
102#else /* _WIN32 */
103 return (uint64_t)GetTickCount64();
104#endif /* _WIN32 */
105}
106
107/* ====================================================================
108 * 非 TCP (UDP/マルチキャスト) 用ヘルスチェックスレッド本体
109 * ==================================================================== */
110#ifndef _WIN32
111static void *health_thread_func(void *arg)
112#else /* _WIN32 */
113static DWORD WINAPI health_thread_func(LPVOID arg)
114#endif /* _WIN32 */
115{
116 struct PotrContext_ *ctx = (struct PotrContext_ *)arg;
118
119 shdr.service_id = ctx->service.service_id;
120 /* 1:1 モード: session はコンテキストから取得 (N:1 は後述のループで各ピアから取得) */
121 if (!ctx->is_multi_peer)
122 {
123 shdr.session_id = ctx->session_id;
124 shdr.session_tv_sec = ctx->session_tv_sec;
126 }
127
128 while (ctx->health_running[0])
129 {
130 uint32_t sleep_ms;
131 uint64_t last_send = ctx->last_send_ms;
132
133 if (last_send == 0)
134 {
135 sleep_ms = ctx->global.health_interval_ms;
136 }
137 else
138 {
139 uint64_t elapsed = health_get_ms() - last_send;
140 sleep_ms = (elapsed >= (uint64_t)ctx->global.health_interval_ms)
141 ? 1U
142 : (uint32_t)(ctx->global.health_interval_ms - elapsed);
143 }
144
145 health_sleep(ctx, 0, sleep_ms);
146
147 if (!ctx->health_running[0]) break;
148
149 /* データ送信によりタイマーがリセットされた場合は PING 不要 */
150 last_send = ctx->last_send_ms;
151 if (last_send != 0)
152 {
153 uint64_t elapsed = health_get_ms() - last_send;
154 if (elapsed < (uint64_t)ctx->global.health_interval_ms)
155 {
157 "health[service_id=%" PRId64 "]: PING timer reset",
158 ctx->service.service_id);
159 continue;
160 }
161 }
162
163 /* PING を送信する */
164 if (ctx->is_multi_peer)
165 {
166 /* N:1 モード: アクティブ全ピアへ PING を送信する */
167 int i;
168
170
171 for (i = 0; i < ctx->max_peers && ctx->health_running[0]; i++)
172 {
173 PotrPacket ping_pkt;
174 PotrPacketSessionHdr peer_shdr;
175 uint32_t seq;
176 size_t wire_len;
177 int k;
178
179 if (!ctx->peers[i].active) continue;
180
181 peer_shdr.service_id = ctx->service.service_id;
182 peer_shdr.session_id = ctx->peers[i].session_id;
183 peer_shdr.session_tv_sec = ctx->peers[i].session_tv_sec;
184 peer_shdr.session_tv_nsec = ctx->peers[i].session_tv_nsec;
185
186#ifndef _WIN32
187 pthread_mutex_lock(&ctx->peers[i].send_window_mutex);
188#else /* _WIN32 */
189 EnterCriticalSection(&ctx->peers[i].send_window_mutex);
190#endif /* _WIN32 */
191 seq = ctx->peers[i].send_window.next_seq;
192 packet_build_ping(&ping_pkt, &peer_shdr, seq, 0U);
193#ifndef _WIN32
194 pthread_mutex_unlock(&ctx->peers[i].send_window_mutex);
195#else /* _WIN32 */
196 LeaveCriticalSection(&ctx->peers[i].send_window_mutex);
197#endif /* _WIN32 */
198
199 if (ctx->service.encrypt_enabled)
200 {
201 uint8_t wire_buf[PACKET_HEADER_SIZE + POTR_CRYPTO_TAG_SIZE];
202 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
203 size_t enc_out = POTR_CRYPTO_TAG_SIZE;
204
205 ping_pkt.flags |= htons(POTR_FLAG_ENCRYPTED);
206 ping_pkt.payload_len = htons((uint16_t)POTR_CRYPTO_TAG_SIZE);
207
208 memcpy(nonce, &ping_pkt.session_id, 4);
209 memcpy(nonce + 4, &ping_pkt.flags, 2);
210 memcpy(nonce + 6, &ping_pkt.seq_num, 4);
211 memset(nonce + 10, 0, 2);
212
213 memcpy(wire_buf, &ping_pkt, PACKET_HEADER_SIZE);
214 if (potr_encrypt(wire_buf + PACKET_HEADER_SIZE, &enc_out,
215 NULL, 0,
216 ctx->service.encrypt_key,
217 nonce,
218 wire_buf, PACKET_HEADER_SIZE) != 0)
219 {
220 continue;
221 }
222 wire_len = PACKET_HEADER_SIZE + enc_out;
223
224 for (k = 0; k < (int)POTR_MAX_PATH; k++)
225 {
226 if (ctx->peers[i].dest_addr[k].sin_family == 0) continue;
227#ifndef _WIN32
228 sendto(ctx->sock[k], wire_buf, wire_len, 0,
229 (const struct sockaddr *)&ctx->peers[i].dest_addr[k],
230 sizeof(ctx->peers[i].dest_addr[k]));
231#else /* _WIN32 */
232 sendto(ctx->sock[k], (const char *)wire_buf, (int)wire_len, 0,
233 (const struct sockaddr *)&ctx->peers[i].dest_addr[k],
234 sizeof(ctx->peers[i].dest_addr[k]));
235#endif /* _WIN32 */
236 }
237 }
238 else
239 {
240 wire_len = packet_wire_size(&ping_pkt);
241
242 for (k = 0; k < (int)POTR_MAX_PATH; k++)
243 {
244 if (ctx->peers[i].dest_addr[k].sin_family == 0) continue;
245#ifndef _WIN32
246 sendto(ctx->sock[k], &ping_pkt, wire_len, 0,
247 (const struct sockaddr *)&ctx->peers[i].dest_addr[k],
248 sizeof(ctx->peers[i].dest_addr[k]));
249#else /* _WIN32 */
250 sendto(ctx->sock[k], (const char *)&ping_pkt, (int)wire_len, 0,
251 (const struct sockaddr *)&ctx->peers[i].dest_addr[k],
252 sizeof(ctx->peers[i].dest_addr[k]));
253#endif /* _WIN32 */
254 }
255 }
256
258 "health[service_id=%" PRId64 "]: PING peer=%u seq=%u",
259 ctx->service.service_id,
260 (unsigned)ctx->peers[i].peer_id, (unsigned)seq);
261 }
262
264 }
265 else
266 {
267 /* 1:1 UDP/マルチキャスト: 単一宛先へ PING を送信する */
268 PotrPacket ping_pkt;
269 uint32_t seq;
270 size_t wire_len;
271 int build_result;
272
274 seq = ctx->send_window.next_seq;
275 build_result = packet_build_ping(&ping_pkt, &shdr, seq, 0U);
277
278 if (build_result != POTR_SUCCESS) { continue; }
279
281 "health[service_id=%" PRId64 "]: PING seq=%u",
282 ctx->service.service_id, (unsigned)seq);
283
284 if (ctx->service.encrypt_enabled)
285 {
286 uint8_t wire_buf[PACKET_HEADER_SIZE + POTR_CRYPTO_TAG_SIZE];
287 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
288 size_t enc_out = POTR_CRYPTO_TAG_SIZE;
289 int k;
290
291 ping_pkt.flags |= htons(POTR_FLAG_ENCRYPTED);
292 ping_pkt.payload_len = htons((uint16_t)POTR_CRYPTO_TAG_SIZE);
293
294 memcpy(nonce, &ping_pkt.session_id, 4);
295 memcpy(nonce + 4, &ping_pkt.flags, 2);
296 memcpy(nonce + 6, &ping_pkt.seq_num, 4);
297 memset(nonce + 10, 0, 2);
298
299 memcpy(wire_buf, &ping_pkt, PACKET_HEADER_SIZE);
300 if (potr_encrypt(wire_buf + PACKET_HEADER_SIZE, &enc_out,
301 NULL, 0,
302 ctx->service.encrypt_key,
303 nonce,
304 wire_buf, PACKET_HEADER_SIZE) != 0) { continue; }
305 wire_len = PACKET_HEADER_SIZE + enc_out;
306
307 for (k = 0; k < ctx->n_path; k++)
308 {
309#ifndef _WIN32
310 sendto(ctx->sock[k], wire_buf, wire_len, 0,
311 (const struct sockaddr *)&ctx->dest_addr[k],
312 sizeof(ctx->dest_addr[k]));
313#else /* _WIN32 */
314 sendto(ctx->sock[k], (const char *)wire_buf, (int)wire_len, 0,
315 (const struct sockaddr *)&ctx->dest_addr[k],
316 sizeof(ctx->dest_addr[k]));
317#endif /* _WIN32 */
318 }
319 }
320 else
321 {
322 int k;
323 wire_len = packet_wire_size(&ping_pkt);
324
325 for (k = 0; k < ctx->n_path; k++)
326 {
327#ifndef _WIN32
328 sendto(ctx->sock[k], &ping_pkt, wire_len, 0,
329 (const struct sockaddr *)&ctx->dest_addr[k],
330 sizeof(ctx->dest_addr[k]));
331#else /* _WIN32 */
332 sendto(ctx->sock[k], (const char *)&ping_pkt, (int)wire_len, 0,
333 (const struct sockaddr *)&ctx->dest_addr[k],
334 sizeof(ctx->dest_addr[k]));
335#endif /* _WIN32 */
336 }
337 }
338 }
339
341 }
342
343#ifndef _WIN32
344 return NULL;
345#else /* _WIN32 */
346 return 0;
347#endif /* _WIN32 */
348}
349
350/* ====================================================================
351 * TCP 用ヘルスチェックスレッド本体 (path ごと)
352 * ==================================================================== */
353#ifndef _WIN32
354static void *tcp_health_thread_func(void *arg)
355#else /* _WIN32 */
356static DWORD WINAPI tcp_health_thread_func(LPVOID arg)
357#endif /* _WIN32 */
358{
359 HealthArg *harg = (HealthArg *)arg;
360 struct PotrContext_ *ctx = harg->ctx;
361 int path_idx = harg->path_idx;
363
364 shdr.service_id = ctx->service.service_id;
365 shdr.session_id = ctx->session_id;
366 shdr.session_tv_sec = ctx->session_tv_sec;
368
370 "tcp_health[service_id=%" PRId64 " path=%d]: starting",
371 ctx->service.service_id, path_idx);
372
373 while (ctx->health_running[path_idx])
374 {
375 PotrPacket ping_pkt;
376 uint32_t seq;
377 size_t wire_len;
378 int build_result;
379
380 /* 固定間隔でスリープ */
381 health_sleep(ctx, path_idx, ctx->global.health_interval_ms);
382
383 if (!ctx->health_running[path_idx]) break;
384
385 /* PING タイムアウト判定 (health_timeout_ms 超過でソケットをクローズ) */
386 if (ctx->global.health_timeout_ms > 0)
387 {
388 uint64_t last = ctx->tcp_last_ping_recv_ms[path_idx];
389 if (last > 0
390 && health_get_ms() - last > (uint64_t)ctx->global.health_timeout_ms)
391 {
393 "tcp_health[service_id=%" PRId64 " path=%d]: PING timeout"
394 " (%llu ms), closing connection",
395 ctx->service.service_id, path_idx,
396 (unsigned long long)(health_get_ms() - last));
397 /* ソケットをクローズ → recv スレッドが切断を検知する */
398#ifndef _WIN32
399 shutdown(ctx->tcp_conn_fd[path_idx], SHUT_RDWR);
400 close(ctx->tcp_conn_fd[path_idx]);
401#else /* _WIN32 */
402 shutdown(ctx->tcp_conn_fd[path_idx], SD_BOTH);
403 closesocket(ctx->tcp_conn_fd[path_idx]);
404#endif /* _WIN32 */
405 ctx->tcp_conn_fd[path_idx] = POTR_INVALID_SOCKET;
406 continue; /* connect スレッドに停止されるまで継続 */
407 }
408 }
409
410 /* 接続中でなければ PING をスキップ */
411 if (ctx->tcp_active_paths == 0
412 || ctx->tcp_conn_fd[path_idx] == POTR_INVALID_SOCKET)
413 {
414 continue;
415 }
416
417 /* PING パケットを構築する */
419 /* session 情報を最新に更新 (再接続時に変わっている可能性がある) */
420 shdr.session_id = ctx->session_id;
421 shdr.session_tv_sec = ctx->session_tv_sec;
423 seq = ctx->send_window.next_seq;
424 build_result = packet_build_ping(&ping_pkt, &shdr, seq, 0U);
426
427 if (build_result != POTR_SUCCESS) { continue; }
428
430 "tcp_health[service_id=%" PRId64 " path=%d]: PING seq=%u",
431 ctx->service.service_id, path_idx, (unsigned)seq);
432
433 if (ctx->service.encrypt_enabled)
434 {
435 uint8_t wire_buf[PACKET_HEADER_SIZE + POTR_CRYPTO_TAG_SIZE];
436 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
437 size_t enc_out = POTR_CRYPTO_TAG_SIZE;
438
439 ping_pkt.flags |= htons(POTR_FLAG_ENCRYPTED);
440 ping_pkt.payload_len = htons((uint16_t)POTR_CRYPTO_TAG_SIZE);
441
442 memcpy(nonce, &ping_pkt.session_id, 4);
443 memcpy(nonce + 4, &ping_pkt.flags, 2);
444 memcpy(nonce + 6, &ping_pkt.seq_num, 4);
445 memset(nonce + 10, 0, 2);
446
447 memcpy(wire_buf, &ping_pkt, PACKET_HEADER_SIZE);
448 if (potr_encrypt(wire_buf + PACKET_HEADER_SIZE, &enc_out,
449 NULL, 0,
450 ctx->service.encrypt_key,
451 nonce,
452 wire_buf, PACKET_HEADER_SIZE) != 0) { continue; }
453 wire_len = PACKET_HEADER_SIZE + enc_out;
454
455 POTR_MUTEX_LOCK_LOCAL(&ctx->tcp_send_mutex[path_idx]);
456 if (ctx->tcp_conn_fd[path_idx] != POTR_INVALID_SOCKET)
457 {
458#ifndef _WIN32
459 {
460 size_t sent = 0;
461 const uint8_t *p = wire_buf;
462 while (sent < wire_len)
463 {
464 ssize_t n = send(ctx->tcp_conn_fd[path_idx],
465 p + sent, wire_len - sent, 0);
466 if (n <= 0) break;
467 sent += (size_t)n;
468 }
469 }
470#else /* _WIN32 */
471 send(ctx->tcp_conn_fd[path_idx],
472 (const char *)wire_buf, (int)wire_len, 0);
473#endif /* _WIN32 */
474 }
476 }
477 else
478 {
479 wire_len = packet_wire_size(&ping_pkt);
480
481 POTR_MUTEX_LOCK_LOCAL(&ctx->tcp_send_mutex[path_idx]);
482 if (ctx->tcp_conn_fd[path_idx] != POTR_INVALID_SOCKET)
483 {
484#ifndef _WIN32
485 {
486 size_t sent = 0;
487 const uint8_t *p = (const uint8_t *)&ping_pkt;
488 while (sent < wire_len)
489 {
490 ssize_t n = send(ctx->tcp_conn_fd[path_idx],
491 p + sent, wire_len - sent, 0);
492 if (n <= 0) break;
493 sent += (size_t)n;
494 }
495 }
496#else /* _WIN32 */
497 send(ctx->tcp_conn_fd[path_idx],
498 (const char *)&ping_pkt, (int)wire_len, 0);
499#endif /* _WIN32 */
500 }
502 }
503 }
504
506 "tcp_health[service_id=%" PRId64 " path=%d]: exited",
507 ctx->service.service_id, path_idx);
508
509#ifndef _WIN32
510 return NULL;
511#else /* _WIN32 */
512 return 0;
513#endif /* _WIN32 */
514}
515
527{
528 if (ctx == NULL) { return POTR_ERROR; }
529
530 if (ctx->global.health_interval_ms == 0)
531 {
533 "health_thread[service_id=%" PRId64 "]: disabled (health_interval_ms=0)",
534 ctx->service.service_id);
535 return POTR_SUCCESS;
536 }
537
539 "health_thread[service_id=%" PRId64 "]: starting (interval=%ums)",
540 ctx->service.service_id,
541 (unsigned)ctx->global.health_interval_ms);
542
543#ifndef _WIN32
544 pthread_mutex_init(&ctx->health_mutex[0], NULL);
545 pthread_cond_init(&ctx->health_wakeup[0], NULL);
546#else /* _WIN32 */
547 InitializeCriticalSection(&ctx->health_mutex[0]);
548 InitializeConditionVariable(&ctx->health_wakeup[0]);
549#endif /* _WIN32 */
550
551 ctx->health_running[0] = 1;
552
553#ifndef _WIN32
554 if (pthread_create(&ctx->health_thread[0], NULL, health_thread_func, ctx) != 0)
555 {
556 ctx->health_running[0] = 0;
558 "health_thread[service_id=%" PRId64 "]: pthread_create failed",
559 ctx->service.service_id);
560 return POTR_ERROR;
561 }
562#else /* _WIN32 */
563 ctx->health_thread[0] = CreateThread(NULL, 0, health_thread_func, ctx, 0, NULL);
564 if (ctx->health_thread[0] == NULL)
565 {
566 ctx->health_running[0] = 0;
568 "health_thread[service_id=%" PRId64 "]: CreateThread failed",
569 ctx->service.service_id);
570 return POTR_ERROR;
571 }
572#endif /* _WIN32 */
573
574 return POTR_SUCCESS;
575}
576
585{
586 if (ctx == NULL) { return POTR_ERROR; }
587 if (!ctx->health_running[0]) { return POTR_SUCCESS; }
588
589 ctx->health_running[0] = 0;
590
591#ifndef _WIN32
592 pthread_mutex_lock(&ctx->health_mutex[0]);
593 pthread_cond_signal(&ctx->health_wakeup[0]);
594 pthread_mutex_unlock(&ctx->health_mutex[0]);
595
596 pthread_join(ctx->health_thread[0], NULL);
597
598 pthread_cond_destroy(&ctx->health_wakeup[0]);
599 pthread_mutex_destroy(&ctx->health_mutex[0]);
600#else /* _WIN32 */
601 if (ctx->health_thread[0] != NULL)
602 {
603 EnterCriticalSection(&ctx->health_mutex[0]);
604 WakeConditionVariable(&ctx->health_wakeup[0]);
605 LeaveCriticalSection(&ctx->health_mutex[0]);
606
607 WaitForSingleObject(ctx->health_thread[0], INFINITE);
608 CloseHandle(ctx->health_thread[0]);
609 ctx->health_thread[0] = NULL;
610 }
611 DeleteCriticalSection(&ctx->health_mutex[0]);
612 /* Windows の CONDITION_VARIABLE は破棄不要 */
613#endif /* _WIN32 */
614
615 return POTR_SUCCESS;
616}
617
629int potr_tcp_health_thread_start(struct PotrContext_ *ctx, int path_idx)
630{
631 if (ctx == NULL) { return POTR_ERROR; }
632
633 if (ctx->global.health_interval_ms == 0)
634 {
636 "tcp_health_thread[service_id=%" PRId64 " path=%d]: disabled",
637 ctx->service.service_id, path_idx);
638 return POTR_SUCCESS;
639 }
640
641 s_health_args[path_idx].ctx = ctx;
642 s_health_args[path_idx].path_idx = path_idx;
643
644 ctx->health_running[path_idx] = 1;
645
646#ifndef _WIN32
647 if (pthread_create(&ctx->health_thread[path_idx], NULL,
648 tcp_health_thread_func, &s_health_args[path_idx]) != 0)
649 {
650 ctx->health_running[path_idx] = 0;
652 "tcp_health_thread[service_id=%" PRId64 " path=%d]: pthread_create failed",
653 ctx->service.service_id, path_idx);
654 return POTR_ERROR;
655 }
656#else /* _WIN32 */
657 ctx->health_thread[path_idx] = CreateThread(NULL, 0,
659 &s_health_args[path_idx], 0, NULL);
660 if (ctx->health_thread[path_idx] == NULL)
661 {
662 ctx->health_running[path_idx] = 0;
664 "tcp_health_thread[service_id=%" PRId64 " path=%d]: CreateThread failed",
665 ctx->service.service_id, path_idx);
666 return POTR_ERROR;
667 }
668#endif /* _WIN32 */
669
670 return POTR_SUCCESS;
671}
672
681int potr_tcp_health_thread_stop(struct PotrContext_ *ctx, int path_idx)
682{
683 if (ctx == NULL) { return POTR_ERROR; }
684 if (!ctx->health_running[path_idx]) { return POTR_SUCCESS; }
685
686 ctx->health_running[path_idx] = 0;
687
688#ifndef _WIN32
689 pthread_mutex_lock(&ctx->health_mutex[path_idx]);
690 pthread_cond_signal(&ctx->health_wakeup[path_idx]);
691 pthread_mutex_unlock(&ctx->health_mutex[path_idx]);
692
693 pthread_join(ctx->health_thread[path_idx], NULL);
694#else /* _WIN32 */
695 if (ctx->health_thread[path_idx] != NULL)
696 {
697 EnterCriticalSection(&ctx->health_mutex[path_idx]);
698 WakeConditionVariable(&ctx->health_wakeup[path_idx]);
699 LeaveCriticalSection(&ctx->health_mutex[path_idx]);
700
701 WaitForSingleObject(ctx->health_thread[path_idx], INFINITE);
702 CloseHandle(ctx->health_thread[path_idx]);
703 ctx->health_thread[path_idx] = NULL;
704 }
705#endif /* _WIN32 */
706
707 return POTR_SUCCESS;
708}
データ暗号化・復号モジュールの内部ヘッダー。
int potr_encrypt(uint8_t *dst, size_t *dst_len, const uint8_t *src, size_t src_len, const uint8_t *key, const uint8_t *nonce, const uint8_t *aad, size_t aad_len)
AES-256-GCM でデータを暗号化します。
#define POTR_CRYPTO_NONCE_SIZE
AES-256-GCM ノンスサイズ (バイト)。session_id (4B NBO) + flags (2B NBO) + seq_or_ack_num (4B NBO) + padding (2B...
#define POTR_CRYPTO_TAG_SIZE
AES-256-GCM 認証タグサイズ (バイト)。暗号文の直後に付加する。
#define POTR_MAX_PATH
マルチパスの最大パス数。
#define POTR_FLAG_ENCRYPTED
AES-256-GCM 認証タグが付与されていることを示す外側パケットフラグ。 POTR_FLAG_DATA と組み合わせる場合: [ヘッダー 32B][暗号文: packed_len B][GCM ...
#define POTR_SUCCESS
成功の戻り値を表す定数。
#define POTR_ERROR
失敗の戻り値を表す定数。
int packet_build_ping(PotrPacket *packet, const PotrPacketSessionHdr *shdr, uint32_t seq_num, uint32_t ack_num)
PING パケットを構築します。
Definition packet.c:99
size_t packet_wire_size(const PotrPacket *packet)
パケットのヘッダー + ペイロードの合計バイト数を返します。
Definition packet.c:347
パケット構築・解析モジュールの内部ヘッダー。
#define PACKET_HEADER_SIZE
パケットヘッダーの固定長 (バイト)。payload フィールドの開始オフセット。
Definition packet.h:23
通信ライブラリ (動的リンク用) のヘッダーファイル。
通信ライブラリの定数ファイル。
@ POTR_TRACE_ERROR
エラー。操作の失敗を記録。TRACE_LV_ERROR (1) と同値。
@ POTR_TRACE_VERBOSE
詳細情報 (デバッグ)。TRACE_LV_VERBOSE (4) と同値。
@ POTR_TRACE_WARNING
警告。回復可能な異常を記録。TRACE_LV_WARNING (2) と同値。
セッションコンテキスト内部定義ヘッダー。
#define POTR_INVALID_SOCKET
Definition potrContext.h:36
pthread_mutex_t PotrMutexLocal
static void health_sleep(struct PotrContext_ *ctx, int path_idx, uint32_t interval_ms)
#define POTR_MUTEX_LOCK_LOCAL(m)
int potr_tcp_health_thread_stop(struct PotrContext_ *ctx, int path_idx)
TCP ヘルスチェックスレッドを停止します。
#define POTR_MUTEX_UNLOCK_LOCAL(m)
static void * health_thread_func(void *arg)
int potr_health_thread_start(struct PotrContext_ *ctx)
非 TCP ヘルスチェックスレッドを起動します。
int potr_health_thread_stop(struct PotrContext_ *ctx)
非 TCP ヘルスチェックスレッドを停止します。
static HealthArg s_health_args[POTR_MAX_PATH]
static void * tcp_health_thread_func(void *arg)
static uint64_t health_get_ms(void)
int potr_tcp_health_thread_start(struct PotrContext_ *ctx, int path_idx)
TCP ヘルスチェックスレッドを path ごとに起動します。
ヘルスチェックスレッド内部ヘッダー。
porter 内部ログマクロ定義ヘッダー。
#define POTR_LOG(level,...)
porter 内部ログ出力マクロ。
Definition potrLog.h:68
N:1 モード用ピアテーブル管理モジュールの内部ヘッダー。
struct PotrContext_ * ctx
セッションコンテキスト構造体。PotrHandle の実体。
uint32_t session_id
自セッション識別子 (乱数)。
volatile int tcp_active_paths
アクティブ TCP path 数 (0 = 全切断)。
PotrGlobalConfig global
グローバル設定。
volatile uint64_t tcp_last_ping_recv_ms[POTR_MAX_PATH]
TCP PING 応答最終受信時刻 (ms, CLOCK_MONOTONIC 基準)。path ごと。接続確立時に現在時刻で初期化。
PotrCondVar health_wakeup[POTR_MAX_PATH]
ヘルスチェックスレッドを即時起床させる条件変数 (path ごと)。
PotrMutex send_window_mutex
send_window 保護用ミューテックス (送信スレッド・ヘルスチェックスレッド・受信スレッドが競合するため)。
int32_t session_tv_nsec
自セッション開始時刻 ナノ秒部。
volatile uint64_t last_send_ms
最終パケット送信時刻 (データ or PING、ms、単調増加)。0 = 未送信。
PotrMutex tcp_send_mutex[POTR_MAX_PATH]
TCP send() 排他制御 (path ごと)。送信スレッド・ヘルスチェックスレッド・recv スレッド競合防止。
int is_multi_peer
1: N:1 モード (src_addr/src_port 省略), 0: 1:1 モード。
PotrServiceDef service
サービス定義。
PotrThread health_thread[POTR_MAX_PATH]
ヘルスチェックスレッドハンドル (path ごと、送信者のみ)。
PotrMutex health_mutex[POTR_MAX_PATH]
ヘルスチェックスレッド停止用ミューテックス (path ごと)。
PotrMutex peers_mutex
ピアテーブル保護用ミューテックス。
PotrSocket sock[POTR_MAX_PATH]
各パスの UDP ソケット。
volatile int health_running[POTR_MAX_PATH]
ヘルスチェックスレッド実行フラグ (1: 実行中, 0: 停止)。path ごと。
PotrWindow send_window
送信バッファ (過去 N パケット保持。NACK 再送・REJECT 判定に使用)。
int max_peers
ピアテーブルサイズ (service.max_peers から取得)。
struct sockaddr_in dest_addr[POTR_MAX_PATH]
送信先ソケットアドレス (送信者が sendto に使用)。
int64_t session_tv_sec
自セッション開始時刻 秒部。
int n_path
有効パス数。
PotrPeerContext * peers
ピアテーブル (動的確保。max_peers エントリ)。
PotrSocket tcp_conn_fd[POTR_MAX_PATH]
アクティブ TCP 接続 fd (path ごと)。
uint32_t health_interval_ms
UDP 通信種別の PING 送信間隔 (ミリ秒)。最終 DATA/PING 送信から本値が経過したら PING 送信。0 = 無効。設定ファイルキー: udp_health_interval_ms。
uint32_t health_timeout_ms
UDP 通信種別の受信タイムアウト (ミリ秒)。RECEIVER 側で使用。0 = 無効。設定ファイルキー: udp_health_timeout_ms。
パケットに付与するセッション識別情報。
Definition packet.h:31
int32_t session_tv_nsec
セッション開始時刻 ナノ秒部。
Definition packet.h:35
int64_t service_id
サービス識別子。
Definition packet.h:32
uint32_t session_id
セッション識別子 (乱数)。
Definition packet.h:34
int64_t session_tv_sec
セッション開始時刻 秒部。
Definition packet.h:33
ネットワーク送受信用パケット構造体。
uint32_t session_id
セッション識別子 (NBO)。potrOpenService 時に決定する乱数。
uint16_t flags
パケット種別フラグ (POTR_FLAG_*) (NBO)。
uint32_t seq_num
通番。送信側が付与する連番 (NBO)。
uint16_t payload_len
ペイロード長 (バイト) (NBO)。
PotrWindow send_window
送信ウィンドウ (NACK 再送用)。
int32_t session_tv_nsec
自セッション開始時刻 ナノ秒部。
int64_t session_tv_sec
自セッション開始時刻 秒部。
int active
1: 有効スロット, 0: 空き。
struct sockaddr_in dest_addr[POTR_MAX_PATH]
送信先ソケットアドレス (インデックス = ctx->sock[] の添字)。未使用スロットは sin_family == 0。
uint32_t session_id
自セッション識別子 (乱数)。
PotrPeerId peer_id
外部公開用ピア識別子 (単調増加カウンタから付与)。
PotrMutex send_window_mutex
send_window 保護 (送信・受信・ヘルスチェックスレッド競合)。
int encrypt_enabled
非 0 のとき暗号化有効。設定ファイルに有効な encrypt_key が存在するときに 1 に設定される。
int64_t service_id
サービス ID。
uint8_t encrypt_key[POTR_CRYPTO_KEY_SIZE]
AES-256-GCM 事前共有鍵 (32 バイト)。encrypt_enabled が 0 の場合は未使用。
uint32_t next_seq
送信側: 次に割り当てる通番。受信側: 次に期待する通番。
Definition window.h:35
スライディングウィンドウ管理モジュールの内部ヘッダー。