Document of c-modernization-kit (porter) 1.0.0
Loading...
Searching...
No Matches
potrRecvThread.c
Go to the documentation of this file.
1
13
14#include <string.h>
15#include <inttypes.h>
16
17#ifndef _WIN32
18 #include <sys/socket.h>
19 #include <sys/select.h>
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
22 #include <unistd.h>
23 #include <time.h>
24 #include <errno.h>
25#else /* _WIN32 */
26 #include <winsock2.h>
27 #include <ws2tcpip.h>
28#endif /* _WIN32 */
29
30#include <porter_const.h>
31
32#include "../protocol/packet.h"
33#include "../protocol/seqnum.h"
34#include "../protocol/window.h"
35#include "../potrContext.h"
36#include "../potrPeerTable.h"
37#include "potrRecvThread.h"
40#include "../infra/potrLog.h"
41
42/* 現在時刻をミリ秒単位で返す (単調増加クロック) */
43static uint64_t get_ms(void)
44{
45#ifndef _WIN32
46 struct timespec ts;
47 clock_gettime(CLOCK_MONOTONIC, &ts);
48 return (uint64_t)ts.tv_sec * 1000ULL + (uint64_t)ts.tv_nsec / 1000000ULL;
49#else /* _WIN32 */
50 return (uint64_t)GetTickCount64();
51#endif /* _WIN32 */
52}
53
54/* 前方宣言: 後で定義される関数 */
55static void recv_deliver(struct PotrContext_ *ctx, const uint8_t *payload,
56 size_t payload_len, int compressed);
57static void send_nack(struct PotrContext_ *ctx, uint32_t nack_seq);
58static void send_ping_reply(struct PotrContext_ *ctx, uint32_t req_seq_num);
59static void raw_session_disconnect(struct PotrContext_ *ctx);
60
61#ifndef _WIN32
62 typedef pthread_mutex_t PotrMutexLocal;
63 #define POTR_MUTEX_LOCK_LOCAL(m) pthread_mutex_lock(m)
64 #define POTR_MUTEX_UNLOCK_LOCAL(m) pthread_mutex_unlock(m)
65#else /* _WIN32 */
66 typedef CRITICAL_SECTION PotrMutexLocal;
67 #define POTR_MUTEX_LOCK_LOCAL(m) EnterCriticalSection(m)
68 #define POTR_MUTEX_UNLOCK_LOCAL(m) LeaveCriticalSection(m)
69#endif /* _WIN32 */
70
71/* ================================================================
72 * N:1 モード専用: ピアコンテキストを使ったパケット処理関数群
73 * ================================================================ */
74
75static void n1_send_nack(struct PotrContext_ *ctx, PotrPeerContext *peer,
76 uint32_t nack_seq)
77{
78 PotrPacket nack_pkt;
80 size_t wire_len;
81 int k;
82
83 shdr.service_id = ctx->service.service_id;
84 shdr.session_id = peer->session_id;
85 shdr.session_tv_sec = peer->session_tv_sec;
87
88 if (packet_build_nack(&nack_pkt, &shdr, nack_seq) != POTR_SUCCESS) return;
89
90 if (ctx->service.encrypt_enabled)
91 {
92 uint8_t wire_buf[PACKET_HEADER_SIZE + POTR_CRYPTO_TAG_SIZE];
93 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
94 size_t enc_out = POTR_CRYPTO_TAG_SIZE;
95
96 nack_pkt.flags |= htons(POTR_FLAG_ENCRYPTED);
97 nack_pkt.payload_len = htons((uint16_t)POTR_CRYPTO_TAG_SIZE);
98
99 /* ノンス: session_id(4B) + flags(2B, NACK|ENCRYPTED NBO) + ack_num(4B NBO) + padding(2B) */
100 memcpy(nonce, &nack_pkt.session_id, 4);
101 memcpy(nonce + 4, &nack_pkt.flags, 2);
102 memcpy(nonce + 6, &nack_pkt.ack_num, 4);
103 memset(nonce + 10, 0, 2);
104
105 memcpy(wire_buf, &nack_pkt, PACKET_HEADER_SIZE);
106 if (potr_encrypt(wire_buf + PACKET_HEADER_SIZE, &enc_out,
107 NULL, 0,
108 ctx->service.encrypt_key,
109 nonce,
110 wire_buf, PACKET_HEADER_SIZE) != 0)
111 {
112 return;
113 }
114 wire_len = PACKET_HEADER_SIZE + enc_out;
115
116 for (k = 0; k < (int)POTR_MAX_PATH; k++)
117 {
118 if (peer->dest_addr[k].sin_family == 0) continue;
119#ifndef _WIN32
120 sendto(ctx->sock[k], wire_buf, wire_len, 0,
121 (const struct sockaddr *)&peer->dest_addr[k],
122 sizeof(peer->dest_addr[k]));
123#else /* _WIN32 */
124 sendto(ctx->sock[k], (const char *)wire_buf, (int)wire_len, 0,
125 (const struct sockaddr *)&peer->dest_addr[k],
126 sizeof(peer->dest_addr[k]));
127#endif /* _WIN32 */
128 }
129 }
130 else
131 {
132 wire_len = packet_wire_size(&nack_pkt);
133 for (k = 0; k < (int)POTR_MAX_PATH; k++)
134 {
135 if (peer->dest_addr[k].sin_family == 0) continue;
136#ifndef _WIN32
137 sendto(ctx->sock[k], &nack_pkt, wire_len, 0,
138 (const struct sockaddr *)&peer->dest_addr[k],
139 sizeof(peer->dest_addr[k]));
140#else /* _WIN32 */
141 sendto(ctx->sock[k], (const char *)&nack_pkt, (int)wire_len, 0,
142 (const struct sockaddr *)&peer->dest_addr[k],
143 sizeof(peer->dest_addr[k]));
144#endif /* _WIN32 */
145 }
146 }
147}
148
149static void n1_send_reject(struct PotrContext_ *ctx, PotrPeerContext *peer,
150 uint32_t seq_num)
151{
152 PotrPacket reject_pkt;
154 size_t wire_len;
155 int k;
156
157 shdr.service_id = ctx->service.service_id;
158 shdr.session_id = peer->session_id;
159 shdr.session_tv_sec = peer->session_tv_sec;
160 shdr.session_tv_nsec = peer->session_tv_nsec;
161
162 if (packet_build_reject(&reject_pkt, &shdr, seq_num) != POTR_SUCCESS) return;
163
164 if (ctx->service.encrypt_enabled)
165 {
166 uint8_t wire_buf[PACKET_HEADER_SIZE + POTR_CRYPTO_TAG_SIZE];
167 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
168 size_t enc_out = POTR_CRYPTO_TAG_SIZE;
169
170 reject_pkt.flags |= htons(POTR_FLAG_ENCRYPTED);
171 reject_pkt.payload_len = htons((uint16_t)POTR_CRYPTO_TAG_SIZE);
172
173 /* ノンス: session_id(4B) + flags(2B, REJECT|ENCRYPTED NBO) + ack_num(4B NBO) + padding(2B) */
174 memcpy(nonce, &reject_pkt.session_id, 4);
175 memcpy(nonce + 4, &reject_pkt.flags, 2);
176 memcpy(nonce + 6, &reject_pkt.ack_num, 4);
177 memset(nonce + 10, 0, 2);
178
179 memcpy(wire_buf, &reject_pkt, PACKET_HEADER_SIZE);
180 if (potr_encrypt(wire_buf + PACKET_HEADER_SIZE, &enc_out,
181 NULL, 0,
182 ctx->service.encrypt_key,
183 nonce,
184 wire_buf, PACKET_HEADER_SIZE) != 0)
185 {
186 return;
187 }
188 wire_len = PACKET_HEADER_SIZE + enc_out;
189
190 for (k = 0; k < (int)POTR_MAX_PATH; k++)
191 {
192 if (peer->dest_addr[k].sin_family == 0) continue;
193#ifndef _WIN32
194 sendto(ctx->sock[k], wire_buf, wire_len, 0,
195 (const struct sockaddr *)&peer->dest_addr[k],
196 sizeof(peer->dest_addr[k]));
197#else /* _WIN32 */
198 sendto(ctx->sock[k], (const char *)wire_buf, (int)wire_len, 0,
199 (const struct sockaddr *)&peer->dest_addr[k],
200 sizeof(peer->dest_addr[k]));
201#endif /* _WIN32 */
202 }
203 }
204 else
205 {
206 wire_len = packet_wire_size(&reject_pkt);
207 for (k = 0; k < (int)POTR_MAX_PATH; k++)
208 {
209 if (peer->dest_addr[k].sin_family == 0) continue;
210#ifndef _WIN32
211 sendto(ctx->sock[k], &reject_pkt, wire_len, 0,
212 (const struct sockaddr *)&peer->dest_addr[k],
213 sizeof(peer->dest_addr[k]));
214#else /* _WIN32 */
215 sendto(ctx->sock[k], (const char *)&reject_pkt, (int)wire_len, 0,
216 (const struct sockaddr *)&peer->dest_addr[k],
217 sizeof(peer->dest_addr[k]));
218#endif /* _WIN32 */
219 }
220 }
221}
222
223static void n1_send_ping_reply(struct PotrContext_ *ctx, PotrPeerContext *peer,
224 uint32_t req_seq_num)
225{
226 PotrPacket reply_pkt;
228 uint32_t my_next_seq;
229 size_t wire_len;
230 int k;
231
232 shdr.service_id = ctx->service.service_id;
233 shdr.session_id = peer->session_id;
234 shdr.session_tv_sec = peer->session_tv_sec;
235 shdr.session_tv_nsec = peer->session_tv_nsec;
236
237#ifndef _WIN32
238 pthread_mutex_lock(&peer->send_window_mutex);
239#else /* _WIN32 */
240 EnterCriticalSection(&peer->send_window_mutex);
241#endif /* _WIN32 */
242 my_next_seq = peer->send_window.next_seq;
243#ifndef _WIN32
244 pthread_mutex_unlock(&peer->send_window_mutex);
245#else /* _WIN32 */
246 LeaveCriticalSection(&peer->send_window_mutex);
247#endif /* _WIN32 */
248
249 if (packet_build_ping(&reply_pkt, &shdr,
250 my_next_seq, req_seq_num + 1U) != POTR_SUCCESS)
251 {
252 return;
253 }
254
255 if (ctx->service.encrypt_enabled)
256 {
257 uint8_t wire_buf[PACKET_HEADER_SIZE + POTR_CRYPTO_TAG_SIZE];
258 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
259 size_t enc_out = POTR_CRYPTO_TAG_SIZE;
260
261 reply_pkt.flags |= htons(POTR_FLAG_ENCRYPTED);
262 reply_pkt.payload_len = htons((uint16_t)POTR_CRYPTO_TAG_SIZE);
263
264 /* ノンス: session_id(4B) + flags(2B, PING|ENCRYPTED NBO) + seq_num(4B) + padding(2B) */
265 memcpy(nonce, &reply_pkt.session_id, 4);
266 memcpy(nonce + 4, &reply_pkt.flags, 2);
267 memcpy(nonce + 6, &reply_pkt.seq_num, 4);
268 memset(nonce + 10, 0, 2);
269
270 memcpy(wire_buf, &reply_pkt, PACKET_HEADER_SIZE);
271 if (potr_encrypt(wire_buf + PACKET_HEADER_SIZE, &enc_out,
272 NULL, 0,
273 ctx->service.encrypt_key,
274 nonce,
275 wire_buf, PACKET_HEADER_SIZE) != 0)
276 {
277 return;
278 }
279 wire_len = PACKET_HEADER_SIZE + enc_out;
280
281 for (k = 0; k < (int)POTR_MAX_PATH; k++)
282 {
283 if (peer->dest_addr[k].sin_family == 0) continue;
284#ifndef _WIN32
285 sendto(ctx->sock[k], wire_buf, wire_len, 0,
286 (const struct sockaddr *)&peer->dest_addr[k],
287 sizeof(peer->dest_addr[k]));
288#else /* _WIN32 */
289 sendto(ctx->sock[k], (const char *)wire_buf, (int)wire_len, 0,
290 (const struct sockaddr *)&peer->dest_addr[k],
291 sizeof(peer->dest_addr[k]));
292#endif /* _WIN32 */
293 }
294 }
295 else
296 {
297 wire_len = packet_wire_size(&reply_pkt);
298 for (k = 0; k < (int)POTR_MAX_PATH; k++)
299 {
300 if (peer->dest_addr[k].sin_family == 0) continue;
301#ifndef _WIN32
302 sendto(ctx->sock[k], &reply_pkt, wire_len, 0,
303 (const struct sockaddr *)&peer->dest_addr[k],
304 sizeof(peer->dest_addr[k]));
305#else /* _WIN32 */
306 sendto(ctx->sock[k], (const char *)&reply_pkt, (int)wire_len, 0,
307 (const struct sockaddr *)&peer->dest_addr[k],
308 sizeof(peer->dest_addr[k]));
309#endif /* _WIN32 */
310 }
311 }
312}
313
314/* N:1: リオーダー待機判定 (ピアごとの reorder 状態を使用) */
315static int n1_reorder_gap_ready(PotrPeerContext *peer, uint32_t nack_num)
316{
317 int64_t now_sec;
318 int32_t now_nsec;
319 uint32_t ms = 0; /* peer には global.reorder_timeout_ms へのアクセスがないため ctx から渡さない */
320
321 /* シンプルな実装: 常に即時 (待機なし) */
322 (void)peer;
323 (void)nack_num;
324 (void)now_sec;
325 (void)now_nsec;
326 (void)ms;
327 return 1;
328}
329
330/* N:1: CONNECTED イベントを発火する (health_alive で重複防止) */
332{
333 if (!peer->health_alive)
334 {
335 peer->health_alive = 1;
337 "recv[service_id=%" PRId64 "]: CONNECTED peer=%u",
338 ctx->service.service_id, (unsigned)peer->peer_id);
339 if (ctx->callback != NULL)
340 {
341 ctx->callback(ctx->service.service_id, peer->peer_id,
342 POTR_EVENT_CONNECTED, NULL, 0);
343 }
344 }
345}
346
347/* N:1: データを解凍してコールバックに渡す */
348static void n1_recv_deliver(struct PotrContext_ *ctx, PotrPeerContext *peer,
349 const uint8_t *payload, size_t payload_len,
350 int compressed)
351{
352 if (compressed)
353 {
354 size_t dec_len = ctx->compress_buf_size;
355
356 if (potr_decompress(ctx->compress_buf, &dec_len,
357 payload, payload_len) == 0)
358 {
359 ctx->callback(ctx->service.service_id, peer->peer_id,
360 POTR_EVENT_DATA, ctx->compress_buf, dec_len);
361 }
362 else
363 {
365 "recv[service_id=%" PRId64 "]: peer=%u decompress failed",
366 ctx->service.service_id, (unsigned)peer->peer_id);
367 }
368 }
369 else
370 {
371 ctx->callback(ctx->service.service_id, peer->peer_id,
372 POTR_EVENT_DATA, payload, payload_len);
373 }
374}
375
376/* N:1: ペイロードエレメントをフラグメント結合してコールバックに渡す */
378 const PotrPacket *elem)
379{
380 if (elem->flags & POTR_FLAG_MORE_FRAG)
381 {
382 if (peer->frag_buf_len + elem->payload_len <= ctx->global.max_message_size)
383 {
384 if (peer->frag_buf_len == 0)
385 {
386 peer->frag_compressed = (elem->flags & POTR_FLAG_COMPRESSED) ? 1 : 0;
387 }
388 memcpy(peer->frag_buf + peer->frag_buf_len,
389 elem->payload, elem->payload_len);
390 peer->frag_buf_len += elem->payload_len;
391 }
392 else
393 {
394 peer->frag_buf_len = 0;
395 peer->frag_compressed = 0;
396 }
397 }
398 else if (peer->frag_buf_len > 0)
399 {
400 if (peer->frag_buf_len + elem->payload_len <= ctx->global.max_message_size)
401 {
402 memcpy(peer->frag_buf + peer->frag_buf_len,
403 elem->payload, elem->payload_len);
404 peer->frag_buf_len += elem->payload_len;
405
406 if (ctx->callback != NULL)
407 {
408 n1_recv_deliver(ctx, peer, peer->frag_buf,
409 peer->frag_buf_len, peer->frag_compressed);
410 }
411 }
412 peer->frag_buf_len = 0;
413 peer->frag_compressed = 0;
414 }
415 else
416 {
417 if (ctx->callback != NULL)
418 {
419 n1_recv_deliver(ctx, peer, elem->payload, (size_t)elem->payload_len,
420 (elem->flags & POTR_FLAG_COMPRESSED) ? 1 : 0);
421 }
422 }
423}
424
425/* N:1: 受信ウィンドウからパケットを取り出して配信する */
426static void n1_drain_recv_window(struct PotrContext_ *ctx, PotrPeerContext *peer)
427{
428 PotrPacket pop_pkt;
429
430 while (window_recv_pop(&peer->recv_window, &pop_pkt) == POTR_SUCCESS)
431 {
432 n1_notify_health_alive(ctx, peer);
433
434 if (pop_pkt.flags & POTR_FLAG_PING) continue;
435
436 {
437 size_t offset = 0;
438 PotrPacket elem;
439 while (packet_unpack_next(&pop_pkt, &offset, &elem) == POTR_SUCCESS)
440 {
441 n1_deliver_payload_elem(ctx, peer, &elem);
442 }
443 }
444 }
445}
446
447/* N:1: DATA/PING を recv_window に投入して NACK 処理を行う */
448static void n1_process_outer_pkt(struct PotrContext_ *ctx, PotrPeerContext *peer,
449 const PotrPacket *pkt)
450{
451 uint32_t nack_num;
452 uint32_t stretch;
453
454 if (window_recv_push(&peer->recv_window, pkt) != POTR_SUCCESS)
455 {
457 "recv[service_id=%" PRId64 "]: peer=%u recv_window full, dropping seq=%u",
458 ctx->service.service_id, (unsigned)peer->peer_id,
459 (unsigned)pkt->seq_num);
460 return;
461 }
462
463 stretch = (uint32_t)(pkt->seq_num - peer->recv_window.base_seq) + 1U;
464 if (stretch * 10U >= (uint32_t)peer->recv_window.window_size * 8U)
465 {
467 "recv[service_id=%" PRId64 "]: peer=%u recv_window utilization high (%u/%u)",
468 ctx->service.service_id, (unsigned)peer->peer_id,
469 (unsigned)stretch, (unsigned)peer->recv_window.window_size);
470 }
471
472 if (window_recv_needs_nack(&peer->recv_window, &nack_num))
473 {
474 if (n1_reorder_gap_ready(peer, nack_num))
475 {
477 "recv[service_id=%" PRId64 "]: peer=%u NACK seq=%u",
478 ctx->service.service_id, (unsigned)peer->peer_id,
479 (unsigned)nack_num);
480 n1_send_nack(ctx, peer, nack_num);
481 }
482 }
483 else
484 {
485 peer->reorder_pending = 0;
486 }
487
488 n1_drain_recv_window(ctx, peer);
489
490 if (window_recv_needs_nack(&peer->recv_window, &nack_num))
491 {
492 if (n1_reorder_gap_ready(peer, nack_num))
493 {
495 "recv[service_id=%" PRId64 "]: peer=%u NACK seq=%u (post-drain)",
496 ctx->service.service_id, (unsigned)peer->peer_id,
497 (unsigned)nack_num);
498 n1_send_nack(ctx, peer, nack_num);
499 }
500 }
501 else
502 {
503 peer->reorder_pending = 0;
504 }
505}
506
507/* N:1: セッション照合・更新 (ピアコンテキスト版) */
509 PotrPeerContext *peer,
510 const PotrPacket *pkt)
511{
512 if (!peer->peer_session_known)
513 {
514 peer->peer_session_id = pkt->session_id;
517 peer->peer_session_known = 1;
518 peer->reorder_pending = 0;
519 window_init(&peer->recv_window, pkt->seq_num,
521 return 1;
522 }
523
524 if (pkt->session_tv_sec > peer->peer_session_tv_sec)
525 {
526 /* 新セッション: フォールスルーして採用 */
527 }
528 else if (pkt->session_tv_sec < peer->peer_session_tv_sec)
529 {
530 return 0;
531 }
532 else if (pkt->session_tv_nsec > peer->peer_session_tv_nsec)
533 {
534 /* 新セッション: フォールスルーして採用 */
535 }
536 else if (pkt->session_tv_nsec < peer->peer_session_tv_nsec)
537 {
538 return 0;
539 }
540 else if (pkt->session_id > peer->peer_session_id)
541 {
542 /* 新セッション: フォールスルーして採用 */
543 }
544 else
545 {
546 return (pkt->session_id == peer->peer_session_id) ? 1 : 0;
547 }
548
549 peer->peer_session_id = pkt->session_id;
552 peer->reorder_pending = 0;
553 window_init(&peer->recv_window, pkt->seq_num,
555 return 1;
556}
557
558/* N:1: パスごとの最終受信時刻を更新し、未知のパスを学習する */
560 const struct sockaddr_in *sender_addr,
561 int path_idx)
562{
563#ifndef _WIN32
564 struct timespec ts;
565 int64_t s;
566 int32_t ns;
567 clock_gettime(CLOCK_MONOTONIC, &ts);
568 s = (int64_t)ts.tv_sec;
569 ns = (int32_t)ts.tv_nsec;
570#else /* _WIN32 */
571 ULONGLONG ms;
572 int64_t s;
573 int32_t ns;
574 ms = GetTickCount64();
575 s = (int64_t)(ms / 1000ULL);
576 ns = (int32_t)((ms % 1000ULL) * 1000000UL);
577#endif /* _WIN32 */
578
579 peer->last_recv_tv_sec = s;
580 peer->last_recv_tv_nsec = ns;
581
582 if (peer->dest_addr[path_idx].sin_family == AF_INET)
583 {
584 /* 既知パス: ポートと最終受信時刻を更新 */
585 peer->dest_addr[path_idx].sin_port = sender_addr->sin_port;
586 peer->path_last_recv_sec[path_idx] = s;
587 peer->path_last_recv_nsec[path_idx] = ns;
588 }
589 else
590 {
591 /* 新規パス: インデックス path_idx のスロットに直接記録 */
592 peer->dest_addr[path_idx] = *sender_addr;
593 peer->path_last_recv_sec[path_idx] = s;
594 peer->path_last_recv_nsec[path_idx] = ns;
595 peer->n_paths++;
597 "n1_update_path_recv: peer=%u path %d learned",
598 (unsigned)peer->peer_id, path_idx);
599 }
600}
601
602/* N:1: select() タイムアウト時にヘルスタイムアウトを確認する */
603static void n1_check_health_timeout(struct PotrContext_ *ctx)
604{
605#ifndef _WIN32
606 struct timespec ts;
607 int64_t now_sec;
608 int32_t now_nsec;
609 clock_gettime(CLOCK_MONOTONIC, &ts);
610 now_sec = (int64_t)ts.tv_sec;
611 now_nsec = (int32_t)ts.tv_nsec;
612#else /* _WIN32 */
613 ULONGLONG ms;
614 int64_t now_sec;
615 int32_t now_nsec;
616 ms = GetTickCount64();
617 now_sec = (int64_t)(ms / 1000ULL);
618 now_nsec = (int32_t)((ms % 1000ULL) * 1000000UL);
619#endif /* _WIN32 */
620 int i;
621 int k;
622
623 if (ctx->global.health_timeout_ms == 0) return;
624
626
627 for (i = 0; i < ctx->max_peers; i++)
628 {
629 int64_t elapsed_ms;
630
631 if (!ctx->peers[i].active) continue;
632 if (!ctx->peers[i].health_alive) continue;
633
634 /* パス単位のタイムアウト: 不通パスを dest_addr から削除する */
635 for (k = 0; k < (int)POTR_MAX_PATH; k++)
636 {
637 int64_t path_elapsed;
638
639 if (ctx->peers[i].dest_addr[k].sin_family == 0) continue; /* 未使用 */
640 if (ctx->peers[i].path_last_recv_sec[k] == 0) continue; /* 初回受信前 */
641
642 path_elapsed = (now_sec - ctx->peers[i].path_last_recv_sec[k]) * 1000LL
643 + (now_nsec - ctx->peers[i].path_last_recv_nsec[k]) / 1000000L;
644
645 if (path_elapsed >= (int64_t)ctx->global.health_timeout_ms)
646 {
647 peer_path_clear(ctx, &ctx->peers[i], k);
648 }
649 }
650
651 /* ピア単位のタイムアウト: 全パス消滅、または最終受信から切断判定 */
652 if (ctx->peers[i].last_recv_tv_sec == 0) continue;
653
654 elapsed_ms = (now_sec - ctx->peers[i].last_recv_tv_sec) * 1000LL
655 + (now_nsec - ctx->peers[i].last_recv_tv_nsec) / 1000000L;
656
657 if (elapsed_ms >= (int64_t)ctx->global.health_timeout_ms)
658 {
659 PotrPeerId dead_id = ctx->peers[i].peer_id;
660
661 ctx->peers[i].health_alive = 0;
663 "recv[service_id=%" PRId64 "]: peer=%u DISCONNECTED (timeout %lldms)",
664 ctx->service.service_id, (unsigned)dead_id,
665 (long long)elapsed_ms);
666
667 if (ctx->callback != NULL)
668 {
669 ctx->callback(ctx->service.service_id, dead_id,
670 POTR_EVENT_DISCONNECTED, NULL, 0);
671 }
672
673 peer_free(ctx, &ctx->peers[i]);
674 }
675 }
676
678}
679
680/* ================================================================
681 * N:1 モード専用ここまで
682 * ================================================================ */
683
684/* 送信元 IP が期待アドレスのいずれかと一致するか確認する。
685 N:1 モード: src_port 指定時は送信元ポートのみでフィルタリング。未指定時は全許可。
686 UNICAST_BIDIR SENDER: 受信パケットの送信元は RECEIVER (dst_addr_resolved) と照合する。
687 UNICAST_BIDIR RECEIVER: 受信パケットの送信元は SENDER (src_addr_resolved) と照合する。
688 その他: src_addr_resolved と照合する。src_addr が未設定の場合は常に 1 (合格) を返す。 */
689static int check_src_addr(const struct PotrContext_ *ctx,
690 const struct sockaddr_in *sender)
691{
692 int i;
693
694 /* N:1 モード: src_port 指定時はポートのみでフィルタリング、未指定時は全許可 */
695 if (ctx->is_multi_peer)
696 {
697 if (ctx->service.src_port != 0)
698 return (ntohs(sender->sin_port) == ctx->service.src_port) ? 1 : 0;
699 return 1;
700 }
701
703 {
704 if (ctx->role == POTR_ROLE_SENDER)
705 {
706 /* SENDER が受け取るパケット: RECEIVER (dst_addr) から来る */
707 if (ctx->service.dst_addr[0][0] == '\0') return 1;
708 for (i = 0; i < ctx->n_path; i++)
709 {
710 if (sender->sin_addr.s_addr == ctx->dst_addr_resolved[i].s_addr)
711 return 1;
712 }
713 }
714 else
715 {
716 /* RECEIVER が受け取るパケット: SENDER (src_addr) から来る */
717 if (ctx->service.src_addr[0][0] == '\0') return 1;
718 for (i = 0; i < ctx->n_path; i++)
719 {
720 if (sender->sin_addr.s_addr == ctx->src_addr_resolved[i].s_addr)
721 return 1;
722 }
723 }
724 return 0;
725 }
726
727 if (ctx->service.src_addr[0][0] == '\0')
728 {
729 return 1;
730 }
731 for (i = 0; i < ctx->n_path; i++)
732 {
733 if (sender->sin_addr.s_addr == ctx->src_addr_resolved[i].s_addr)
734 {
735 return 1;
736 }
737 }
738 return 0;
739}
740
741/* セッションの採用判定を行い、必要であればコンテキストの相手セッション情報を更新する。
742 採用すべきセッションなら 1、破棄すべき旧セッションなら 0 を返す。 */
744 const PotrPacket *pkt)
745{
746 if (!ctx->peer_session_known)
747 {
748 /* 初回受信 (または FIN/タイムアウト後の再接続): セッション採用 + ウィンドウをリセット。
749 pkt->seq_num で初期化することで、送信者の現在位置に直接同期し
750 NACK/REJECT サイクルなしに再加入できる。
751 DATA 着信時: window_init(DATA.seq_num) → push → pop → 即時 CONNECTED
752 PING 着信時: window_init(PING.seq_num) → gap スキャン範囲がゼロ → NACK なし
753 FIN/タイムアウト後は送信者が同一セッションのまま任意の seq から
754 再開する可能性があるため pkt->seq_num を使用する。 */
755 ctx->peer_session_id = pkt->session_id;
758 ctx->peer_session_known = 1;
759 ctx->reorder_pending = 0;
761 "recv[service_id=%" PRId64 "]: new session (first contact), new_id=%u seq=%u",
762 ctx->service.service_id,
763 pkt->session_id, (unsigned)pkt->seq_num);
764 window_init(&ctx->recv_window, pkt->seq_num,
766 return 1;
767 }
768
769 /* (session_tv_sec, session_tv_nsec, session_id) の辞書順で新旧を判定する。
770 - pkt > 既知セッション: 新セッション。return せずにフォールスルーし、
771 関数末尾の「新セッション採用」ブロックで採用処理を行う。
772 - pkt < 既知セッション: 旧セッション。即 return 0 で破棄する。
773 - pkt == 既知セッション: 同一セッション。即 return 1 で通常受信を継続する。
774 新セッションと判定された分岐は LOG のみで return しないため、
775 if-else チェーンを抜けた後に必ず末尾の採用ブロックに到達する。 */
776 if (pkt->session_tv_sec > ctx->peer_session_tv_sec)
777 {
778 /* 新セッション (tv_sec が大): フォールスルーして採用 */
780 "recv[service_id=%" PRId64 "]: new session (tv_sec %lld > %lld)"
781 ", old_id=%u new_id=%u",
782 ctx->service.service_id,
783 (long long)pkt->session_tv_sec, (long long)ctx->peer_session_tv_sec,
784 ctx->peer_session_id, pkt->session_id);
785 }
786 else if (pkt->session_tv_sec < ctx->peer_session_tv_sec)
787 {
788 return 0; /* 旧セッション (tv_sec が小): 破棄 */
789 }
790 else if (pkt->session_tv_nsec > ctx->peer_session_tv_nsec)
791 {
792 /* 新セッション (tv_sec 同一・tv_nsec が大): フォールスルーして採用 */
794 "recv[service_id=%" PRId64 "]: new session (tv_nsec %d > %d)"
795 ", old_id=%u new_id=%u",
796 ctx->service.service_id,
798 ctx->peer_session_id, pkt->session_id);
799 }
800 else if (pkt->session_tv_nsec < ctx->peer_session_tv_nsec)
801 {
802 return 0; /* 旧セッション (tv_sec 同一・tv_nsec が小): 破棄 */
803 }
804 else if (pkt->session_id > ctx->peer_session_id)
805 {
806 /* 新セッション (タイムスタンプ完全一致・session_id が大): フォールスルーして採用 */
808 "recv[service_id=%" PRId64 "]: new session (id tiebreak %u > %u)",
809 ctx->service.service_id,
810 pkt->session_id, ctx->peer_session_id);
811 }
812 else
813 {
814 /* ここに到達するのは tv_sec == tv_sec かつ tv_nsec == tv_nsec かつ
815 session_id <= peer_session_id の場合のみ。
816 新セッション分岐はこの else には入らない。 */
817 if (pkt->session_id == ctx->peer_session_id)
818 {
819 return 1; /* 同一セッション: 採用済みのため再初期化不要 */
820 }
821 else
822 {
823 return 0; /* 旧セッション (タイムスタンプ完全一致・session_id が小): 破棄 */
824 }
825 }
826
827 /* 新セッション採用: コンテキストを更新しウィンドウ・リオーダー状態をリセットする。
828 最初に受信したパケットの seq_num で初期化することで、送信者が先行して
829 送信済みの seq に直接同期し、不要な NACK/REJECT サイクルを発生させない。
830 送信者を先に起動して受信者が後から参加した場合も同様に機能する。 */
831 ctx->peer_session_id = pkt->session_id;
834 ctx->reorder_pending = 0;
835 window_init(&ctx->recv_window, pkt->seq_num,
837 return 1;
838}
839
841#define POTR_NACK_DEDUP_MS 200U
842
843/* 現在時刻をミリ秒単位で返す (単調増加クロック) */
844static uint64_t get_ms_mono(void)
845{
846#ifndef _WIN32
847 struct timespec ts;
848 clock_gettime(CLOCK_MONOTONIC, &ts);
849 return (uint64_t)ts.tv_sec * 1000ULL + (uint64_t)ts.tv_nsec / 1000000ULL;
850#else /* _WIN32 */
851 return (uint64_t)GetTickCount64();
852#endif /* _WIN32 */
853}
854
855/* 現在の CLOCK_MONOTONIC 時刻を取得する */
856static void get_monotonic(int64_t *tv_sec, int32_t *tv_nsec)
857{
858#ifndef _WIN32
859 struct timespec ts;
860 clock_gettime(CLOCK_MONOTONIC, &ts);
861 *tv_sec = (int64_t)ts.tv_sec;
862 *tv_nsec = (int32_t)ts.tv_nsec;
863#else /* _WIN32 */
864 ULONGLONG ms = GetTickCount64();
865 *tv_sec = (int64_t)(ms / 1000ULL);
866 *tv_nsec = (int32_t)((ms % 1000ULL) * 1000000UL);
867#endif /* _WIN32 */
868}
869
870/* パスごとの最終受信時刻と peer_port を更新する */
871static void update_path_recv(struct PotrContext_ *ctx,
872 int path_idx,
873 const struct sockaddr_in *sender)
874{
876 get_monotonic(&ctx->path_last_recv_sec[path_idx],
877 &ctx->path_last_recv_nsec[path_idx]);
878 ctx->peer_port[path_idx] = sender->sin_port; /* NBO のまま格納 */
879
880 /* unicast_bidir で送信先アドレスが未確定の場合は受信パケットの送信元から動的学習する。
881 - src_addr 省略 (動的 1:1 RECEIVER): IP アドレスが 0 → 送信元 IP で更新
882 - src_port=0 (エフェメラルポート動的学習): ポートが 0 → 送信元ポートで更新 */
884 {
885 if (ctx->service.src_addr[0][0] == '\0'
886 && ctx->dest_addr[path_idx].sin_addr.s_addr == 0)
887 {
888 ctx->dest_addr[path_idx].sin_addr = sender->sin_addr; /* NBO */
889 }
890 if (ctx->dest_addr[path_idx].sin_port == 0)
891 {
892 ctx->dest_addr[path_idx].sin_port = sender->sin_port; /* NBO */
893 }
894 }
895}
896
897/* health_alive が dead → alive になった場合に CONNECTED イベントを発火する。
898 ヘルスチェック有効/無効に関わらず health_alive フラグで接続状態を統一管理する。 */
899static void notify_health_alive(struct PotrContext_ *ctx)
900{
901 if (!ctx->health_alive)
902 {
903 ctx->health_alive = 1;
905 "recv[service_id=%" PRId64 "]: CONNECTED",
906 ctx->service.service_id);
907 if (ctx->callback != NULL)
908 {
910 POTR_EVENT_CONNECTED, NULL, 0);
911 }
912 }
913}
914
915/* タイムアウト時に経過時間を確認し、必要なら peer_port クリアと DISCONNECTED イベントを発火する */
916static void check_health_timeout(struct PotrContext_ *ctx)
917{
918 int64_t now_sec;
919 int32_t now_nsec;
920 int i;
921
922 if (ctx->global.health_timeout_ms == 0) return;
923
924 get_monotonic(&now_sec, &now_nsec);
925
926 /* パスごとのタイムアウト: peer_port をクリア */
927 for (i = 0; i < ctx->n_path; i++)
928 {
929 int64_t elapsed_ms;
930
931 if (ctx->path_last_recv_sec[i] == 0) continue;
932
933 elapsed_ms = (now_sec - ctx->path_last_recv_sec[i]) * 1000LL
934 + (now_nsec - ctx->path_last_recv_nsec[i]) / 1000000L;
935
936 if (elapsed_ms >= (int64_t)ctx->global.health_timeout_ms)
937 {
938 ctx->peer_port[i] = 0;
939 ctx->path_last_recv_sec[i] = 0;
940 /* unicast_bidir で動的学習したアドレス・ポートをリセットする (再接続を許可) */
942 {
943 if (ctx->service.src_addr[0][0] == '\0')
944 {
945 /* 動的 1:1 RECEIVER: 学習した送信元 IP をリセット */
946 ctx->dest_addr[i].sin_addr.s_addr = 0;
947 }
948 if (ctx->service.src_port == 0)
949 {
950 /* エフェメラルポート動的学習: ポートをリセット */
951 ctx->dest_addr[i].sin_port = 0;
952 }
953 }
954 }
955 }
956
957 /* 全体の health_alive 判定 */
958 if (!ctx->health_alive || ctx->last_recv_tv_sec == 0) return;
959
960 {
961 int64_t elapsed_ms;
962 elapsed_ms = (now_sec - ctx->last_recv_tv_sec) * 1000LL
963 + (now_nsec - ctx->last_recv_tv_nsec) / 1000000L;
964
965 if (elapsed_ms >= (int64_t)ctx->global.health_timeout_ms)
966 {
967 ctx->health_alive = 0;
969 "recv[service_id=%" PRId64 "]: DISCONNECTED (timeout %lldms >= %ums)",
970 ctx->service.service_id,
971 (long long)elapsed_ms,
972 (unsigned)ctx->global.health_timeout_ms);
973 if (ctx->callback != NULL)
974 {
976 POTR_EVENT_DISCONNECTED, NULL, 0);
977 }
978
979 /* FIN と同様にセッション状態をリセットして次の接続を受け入れ可能にする。
980 peer_session_known をクリアすることで、送信者が同一セッションのまま
981 復帰した場合でも window_init を経由して受信ウィンドウを初期化し、
982 前セッションの next_seq が gap スキャンに影響しないようにする。 */
983 ctx->peer_session_known = 0;
984 ctx->reorder_pending = 0;
985 ctx->last_recv_tv_sec = 0;
986 window_init(&ctx->recv_window, 0,
988 }
989 }
990}
991
992/* 欠番 nack_num に対してリオーダー待機が完了しているか確認する。
993 返値: 1 = 処理進行 (NACK/DISCONNECT を発行すべき)、0 = まだ待機中。
994 reorder_timeout_ms == 0 の場合は常に 1 を返す (即時)。
995 新しいギャップまたは欠番通番が変わった場合はタイマーをリセットして 0 を返す。
996 同一欠番でタイムアウト経過後は reorder_pending を 0 にクリアして 1 を返す。 */
997static int reorder_gap_ready(struct PotrContext_ *ctx, uint32_t nack_num)
998{
999 int64_t now_sec;
1000 int32_t now_nsec;
1001 uint32_t ms;
1002
1003 if (ctx->global.reorder_timeout_ms == 0U) return 1;
1004
1005 ms = ctx->global.reorder_timeout_ms;
1006
1007 /* 新しいギャップ、または欠番通番が変わった: タイマーをリセットして待機開始 */
1008 if (!ctx->reorder_pending || ctx->reorder_nack_num != nack_num)
1009 {
1010 uint32_t effective_ms;
1011 get_monotonic(&now_sec, &now_nsec);
1012
1013 /* マルチキャスト/ブロードキャスト通常モードでは NACK 送出タイミングを分散させる。
1014 複数受信者が同一欠番を同時に NACK すると送信者側で輻輳が発生するため、
1015 タイマー値を reorder_timeout_ms の 100%〜200% の範囲でジッタを付加する。
1016 ジッタ源: now_nsec の下位ビット (外部 RNG 不要・移植性高)。
1017 RAW 系は POTR_TYPE_MULTICAST_RAW / BROADCAST_RAW であり条件に該当しないため対象外。 */
1018 effective_ms = ms;
1019 if (ctx->service.type == POTR_TYPE_MULTICAST
1020 || ctx->service.type == POTR_TYPE_BROADCAST)
1021 {
1022 effective_ms = ms + (uint32_t)((uint32_t)now_nsec % ms);
1023 }
1024
1025 ctx->reorder_pending = 1;
1026 ctx->reorder_nack_num = nack_num;
1027 ctx->reorder_deadline_sec = now_sec + (int64_t)(effective_ms / 1000U);
1028 ctx->reorder_deadline_nsec = now_nsec + (int32_t)((effective_ms % 1000U) * 1000000U);
1029 if (ctx->reorder_deadline_nsec >= 1000000000L)
1030 {
1031 ctx->reorder_deadline_sec++;
1032 ctx->reorder_deadline_nsec -= 1000000000L;
1033 }
1034 return 0; /* 待機開始 */
1035 }
1036
1037 /* 同一欠番: タイムアウト確認 */
1038 get_monotonic(&now_sec, &now_nsec);
1039 if (now_sec > ctx->reorder_deadline_sec
1040 || (now_sec == ctx->reorder_deadline_sec
1041 && now_nsec >= ctx->reorder_deadline_nsec))
1042 {
1043 ctx->reorder_pending = 0;
1044 return 1; /* タイムアウト: 処理進行 */
1045 }
1046
1047 return 0; /* まだ待機中 */
1048}
1049
1050/* select() タイムアウト時に、リオーダー待機中の欠番がタイムアウトしていれば処理する。
1051 通常モード: NACK を送出する。
1052 RAW モード: DISCONNECTED を発行してセッションをリセットし、次のパケットで再同期する。 */
1053static void check_reorder_timeout(struct PotrContext_ *ctx)
1054{
1055 if (!ctx->reorder_pending) return;
1056 if (!reorder_gap_ready(ctx, ctx->reorder_nack_num)) return;
1057
1058 /* reorder_gap_ready が 1 を返した時点で reorder_pending は既にクリア済み */
1059 if (potr_is_raw_type(ctx->service.type))
1060 {
1061 /* RAW モード: DISCONNECTED を発行してセッション状態をリセットする。
1062 次のパケット受信時に check_and_update_session で window_init が呼ばれ
1063 自然に再同期する。 */
1065 ctx->peer_session_known = 0;
1066 ctx->last_recv_tv_sec = 0;
1067 window_init(&ctx->recv_window, 0,
1069 }
1070 else
1071 {
1073 "recv[service_id=%" PRId64 "]: NACK seq=%u (reorder timeout)",
1074 ctx->service.service_id, (unsigned)ctx->reorder_nack_num);
1075 send_nack(ctx, ctx->reorder_nack_num);
1076 }
1077}
1078
1079/* NACK パケットを全パスへ送信する */
1080static void send_nack(struct PotrContext_ *ctx, uint32_t nack_seq)
1081{
1082 PotrPacket nack_pkt;
1084 size_t wire_len;
1085 int i;
1086
1087 shdr.service_id = ctx->service.service_id;
1088 shdr.session_id = ctx->session_id;
1089 shdr.session_tv_sec = ctx->session_tv_sec;
1090 shdr.session_tv_nsec = ctx->session_tv_nsec;
1091
1092 if (packet_build_nack(&nack_pkt, &shdr, nack_seq) != POTR_SUCCESS) return;
1093
1094 if (ctx->service.encrypt_enabled)
1095 {
1096 uint8_t wire_buf[PACKET_HEADER_SIZE + POTR_CRYPTO_TAG_SIZE];
1097 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
1098 size_t enc_out = POTR_CRYPTO_TAG_SIZE;
1099
1100 nack_pkt.flags |= htons(POTR_FLAG_ENCRYPTED);
1101 nack_pkt.payload_len = htons((uint16_t)POTR_CRYPTO_TAG_SIZE);
1102
1103 /* ノンス: session_id(4B) + flags(2B, NACK|ENCRYPTED NBO) + ack_num(4B NBO) + padding(2B) */
1104 memcpy(nonce, &nack_pkt.session_id, 4);
1105 memcpy(nonce + 4, &nack_pkt.flags, 2);
1106 memcpy(nonce + 6, &nack_pkt.ack_num, 4);
1107 memset(nonce + 10, 0, 2);
1108
1109 memcpy(wire_buf, &nack_pkt, PACKET_HEADER_SIZE);
1110 if (potr_encrypt(wire_buf + PACKET_HEADER_SIZE, &enc_out,
1111 NULL, 0,
1112 ctx->service.encrypt_key,
1113 nonce,
1114 wire_buf, PACKET_HEADER_SIZE) != 0)
1115 {
1116 return;
1117 }
1118 wire_len = PACKET_HEADER_SIZE + enc_out;
1119
1120 for (i = 0; i < ctx->n_path; i++)
1121 {
1123 {
1124#ifndef _WIN32
1125 sendto(ctx->sock[i], wire_buf, wire_len, 0,
1126 (const struct sockaddr *)&ctx->dest_addr[i],
1127 sizeof(ctx->dest_addr[i]));
1128#else /* _WIN32 */
1129 sendto(ctx->sock[i], (const char *)wire_buf, (int)wire_len, 0,
1130 (const struct sockaddr *)&ctx->dest_addr[i],
1131 sizeof(ctx->dest_addr[i]));
1132#endif /* _WIN32 */
1133 }
1134 else
1135 {
1136 struct sockaddr_in dest;
1137 uint16_t port;
1138
1139 if (ctx->service.src_port != 0)
1140 port = htons(ctx->service.src_port);
1141 else
1142 port = ctx->peer_port[i];
1143
1144 if (port == 0) continue;
1145
1146 memset(&dest, 0, sizeof(dest));
1147 dest.sin_family = AF_INET;
1148 dest.sin_addr = ctx->src_addr_resolved[i];
1149 dest.sin_port = port;
1150#ifndef _WIN32
1151 sendto(ctx->sock[i], wire_buf, wire_len, 0,
1152 (const struct sockaddr *)&dest, sizeof(dest));
1153#else /* _WIN32 */
1154 sendto(ctx->sock[i], (const char *)wire_buf, (int)wire_len, 0,
1155 (const struct sockaddr *)&dest, sizeof(dest));
1156#endif /* _WIN32 */
1157 }
1158 }
1159 }
1160 else
1161 {
1162 wire_len = packet_wire_size(&nack_pkt);
1163
1164 for (i = 0; i < ctx->n_path; i++)
1165 {
1166 /* UNICAST_BIDIR: dest_addr[i] (dst_addr:dst_port) へ直接送信する。
1167 通常 unicast: src_addr_resolved[i]:src_port または peer_port へ送信する。 */
1169 {
1170#ifndef _WIN32
1171 sendto(ctx->sock[i], &nack_pkt, wire_len, 0,
1172 (const struct sockaddr *)&ctx->dest_addr[i],
1173 sizeof(ctx->dest_addr[i]));
1174#else /* _WIN32 */
1175 sendto(ctx->sock[i], (const char *)&nack_pkt, (int)wire_len, 0,
1176 (const struct sockaddr *)&ctx->dest_addr[i],
1177 sizeof(ctx->dest_addr[i]));
1178#endif /* _WIN32 */
1179 }
1180 else
1181 {
1182 struct sockaddr_in dest;
1183 uint16_t port;
1184
1185 if (ctx->service.src_port != 0)
1186 {
1187 port = htons(ctx->service.src_port);
1188 }
1189 else
1190 {
1191 port = ctx->peer_port[i]; /* NBO */
1192 }
1193
1194 if (port == 0) continue; /* ポート未観測のパスは送れない */
1195
1196 memset(&dest, 0, sizeof(dest));
1197 dest.sin_family = AF_INET;
1198 dest.sin_addr = ctx->src_addr_resolved[i];
1199 dest.sin_port = port;
1200
1201#ifndef _WIN32
1202 sendto(ctx->sock[i], &nack_pkt, wire_len, 0,
1203 (const struct sockaddr *)&dest, sizeof(dest));
1204#else /* _WIN32 */
1205 sendto(ctx->sock[i], (const char *)&nack_pkt, (int)wire_len, 0,
1206 (const struct sockaddr *)&dest, sizeof(dest));
1207#endif /* _WIN32 */
1208 }
1209 }
1210 }
1211}
1212
1213/* UNICAST_BIDIR 専用: PING 応答パケットを全パスへ送信する。
1214 req_seq_num: 受信した PING 要求の seq_num (ack_num フィールドに格納して返す)。 */
1215static void send_ping_reply(struct PotrContext_ *ctx, uint32_t req_seq_num)
1216{
1217 PotrPacket reply_pkt;
1219 uint32_t my_next_seq;
1220 size_t wire_len;
1221 int i;
1222
1223 shdr.service_id = ctx->service.service_id;
1224 shdr.session_id = ctx->session_id;
1225 shdr.session_tv_sec = ctx->session_tv_sec;
1226 shdr.session_tv_nsec = ctx->session_tv_nsec;
1227
1228 /* send_window.next_seq を排他制御下で読み取る
1229 (送信スレッド・ヘルスチェックスレッドと競合するため) */
1230#ifndef _WIN32
1231 pthread_mutex_lock(&ctx->send_window_mutex);
1232#else /* _WIN32 */
1233 EnterCriticalSection(&ctx->send_window_mutex);
1234#endif /* _WIN32 */
1235
1236 my_next_seq = ctx->send_window.next_seq;
1237
1238#ifndef _WIN32
1239 pthread_mutex_unlock(&ctx->send_window_mutex);
1240#else /* _WIN32 */
1241 LeaveCriticalSection(&ctx->send_window_mutex);
1242#endif /* _WIN32 */
1243
1244 /* ack_num=0 は PING 要求と区別できないため req_seq_num+1 を格納する。
1245 受信側は ack_num != 0 で応答を判定するため値の厳密な一致は不要。 */
1246 if (packet_build_ping(&reply_pkt, &shdr,
1247 my_next_seq, req_seq_num + 1U) != POTR_SUCCESS)
1248 {
1249 return;
1250 }
1251
1253 "recv[service_id=%" PRId64 "]: PING reply sent (req_seq=%u my_next_seq=%u)",
1254 ctx->service.service_id,
1255 (unsigned)req_seq_num, (unsigned)my_next_seq);
1256
1257 if (ctx->service.encrypt_enabled)
1258 {
1259 uint8_t wire_buf[PACKET_HEADER_SIZE + POTR_CRYPTO_TAG_SIZE];
1260 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
1261 size_t enc_out = POTR_CRYPTO_TAG_SIZE;
1262
1263 reply_pkt.flags |= htons(POTR_FLAG_ENCRYPTED);
1264 reply_pkt.payload_len = htons((uint16_t)POTR_CRYPTO_TAG_SIZE);
1265
1266 /* ノンス: session_id(4B) + flags(2B, PING|ENCRYPTED NBO) + seq_num(4B) + padding(2B) */
1267 memcpy(nonce, &reply_pkt.session_id, 4);
1268 memcpy(nonce + 4, &reply_pkt.flags, 2);
1269 memcpy(nonce + 6, &reply_pkt.seq_num, 4);
1270 memset(nonce + 10, 0, 2);
1271
1272 memcpy(wire_buf, &reply_pkt, PACKET_HEADER_SIZE);
1273 if (potr_encrypt(wire_buf + PACKET_HEADER_SIZE, &enc_out,
1274 NULL, 0,
1275 ctx->service.encrypt_key,
1276 nonce,
1277 wire_buf, PACKET_HEADER_SIZE) != 0)
1278 {
1279 return;
1280 }
1281 wire_len = PACKET_HEADER_SIZE + enc_out;
1282
1283 for (i = 0; i < ctx->n_path; i++)
1284 {
1285#ifndef _WIN32
1286 sendto(ctx->sock[i], wire_buf, wire_len, 0,
1287 (const struct sockaddr *)&ctx->dest_addr[i],
1288 sizeof(ctx->dest_addr[i]));
1289#else /* _WIN32 */
1290 sendto(ctx->sock[i], (const char *)wire_buf, (int)wire_len, 0,
1291 (const struct sockaddr *)&ctx->dest_addr[i],
1292 sizeof(ctx->dest_addr[i]));
1293#endif /* _WIN32 */
1294 }
1295 }
1296 else
1297 {
1298 wire_len = packet_wire_size(&reply_pkt);
1299
1300 for (i = 0; i < ctx->n_path; i++)
1301 {
1302#ifndef _WIN32
1303 sendto(ctx->sock[i], &reply_pkt, wire_len, 0,
1304 (const struct sockaddr *)&ctx->dest_addr[i],
1305 sizeof(ctx->dest_addr[i]));
1306#else /* _WIN32 */
1307 sendto(ctx->sock[i], (const char *)&reply_pkt, (int)wire_len, 0,
1308 (const struct sockaddr *)&ctx->dest_addr[i],
1309 sizeof(ctx->dest_addr[i]));
1310#endif /* _WIN32 */
1311 }
1312 }
1313}
1314
1315/* REJECT パケットを全パスへ送信する */
1316static void send_reject(struct PotrContext_ *ctx, uint32_t seq_num)
1317{
1318 PotrPacket reject_pkt;
1320 size_t wire_len;
1321 int i;
1322
1323 shdr.service_id = ctx->service.service_id;
1324 shdr.session_id = ctx->session_id;
1325 shdr.session_tv_sec = ctx->session_tv_sec;
1326 shdr.session_tv_nsec = ctx->session_tv_nsec;
1327
1328 if (packet_build_reject(&reject_pkt, &shdr, seq_num) != POTR_SUCCESS) return;
1329
1330 if (ctx->service.encrypt_enabled)
1331 {
1332 uint8_t wire_buf[PACKET_HEADER_SIZE + POTR_CRYPTO_TAG_SIZE];
1333 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
1334 size_t enc_out = POTR_CRYPTO_TAG_SIZE;
1335
1336 reject_pkt.flags |= htons(POTR_FLAG_ENCRYPTED);
1337 reject_pkt.payload_len = htons((uint16_t)POTR_CRYPTO_TAG_SIZE);
1338
1339 /* ノンス: session_id(4B) + flags(2B, REJECT|ENCRYPTED NBO) + ack_num(4B NBO) + padding(2B) */
1340 memcpy(nonce, &reject_pkt.session_id, 4);
1341 memcpy(nonce + 4, &reject_pkt.flags, 2);
1342 memcpy(nonce + 6, &reject_pkt.ack_num, 4);
1343 memset(nonce + 10, 0, 2);
1344
1345 memcpy(wire_buf, &reject_pkt, PACKET_HEADER_SIZE);
1346 if (potr_encrypt(wire_buf + PACKET_HEADER_SIZE, &enc_out,
1347 NULL, 0,
1348 ctx->service.encrypt_key,
1349 nonce,
1350 wire_buf, PACKET_HEADER_SIZE) != 0)
1351 {
1352 return;
1353 }
1354 wire_len = PACKET_HEADER_SIZE + enc_out;
1355
1356 for (i = 0; i < ctx->n_path; i++)
1357 {
1358#ifndef _WIN32
1359 sendto(ctx->sock[i], wire_buf, wire_len, 0,
1360 (const struct sockaddr *)&ctx->dest_addr[i],
1361 sizeof(ctx->dest_addr[i]));
1362#else /* _WIN32 */
1363 sendto(ctx->sock[i], (const char *)wire_buf, (int)wire_len, 0,
1364 (const struct sockaddr *)&ctx->dest_addr[i],
1365 sizeof(ctx->dest_addr[i]));
1366#endif /* _WIN32 */
1367 }
1368 }
1369 else
1370 {
1371 wire_len = packet_wire_size(&reject_pkt);
1372
1373 for (i = 0; i < ctx->n_path; i++)
1374 {
1375#ifndef _WIN32
1376 sendto(ctx->sock[i], &reject_pkt, wire_len, 0,
1377 (const struct sockaddr *)&ctx->dest_addr[i],
1378 sizeof(ctx->dest_addr[i]));
1379#else /* _WIN32 */
1380 sendto(ctx->sock[i], (const char *)&reject_pkt, (int)wire_len, 0,
1381 (const struct sockaddr *)&ctx->dest_addr[i],
1382 sizeof(ctx->dest_addr[i]));
1383#endif /* _WIN32 */
1384 }
1385 }
1386}
1387
1388/* 受信データを解凍してコールバックに渡す */
1389static void recv_deliver(struct PotrContext_ *ctx,
1390 const uint8_t *payload,
1391 size_t payload_len,
1392 int compressed)
1393{
1394 if (compressed)
1395 {
1396 size_t dec_len = ctx->compress_buf_size;
1397
1398 if (potr_decompress(ctx->compress_buf, &dec_len,
1399 payload, payload_len) == 0)
1400 {
1402 "recv[service_id=%" PRId64 "]: decompress %zu -> %zu bytes",
1403 ctx->service.service_id, payload_len, dec_len);
1406 ctx->compress_buf,
1407 dec_len);
1408 }
1409 else
1410 {
1412 "recv[service_id=%" PRId64 "]: decompress failed (src_len=%zu)",
1413 ctx->service.service_id, payload_len);
1414 }
1415 }
1416 else
1417 {
1419 payload, payload_len);
1420 }
1421}
1422
1423/* ペイロードエレメント 1 件のフラグメント結合・解凍・コールバック処理。
1424 window_recv_pop で取り出した外側パケットを packet_unpack_next で展開した
1425 各ペイロードエレメントに対して呼び出す。 */
1426static void deliver_payload_elem(struct PotrContext_ *ctx, const PotrPacket *elem)
1427{
1428 if (elem->flags & POTR_FLAG_MORE_FRAG)
1429 {
1430 /* 中間フラグメント: バッファに追記 */
1431 if (ctx->frag_buf_len + elem->payload_len <= ctx->global.max_message_size)
1432 {
1433 if (ctx->frag_buf_len == 0)
1434 {
1435 if (elem->flags & POTR_FLAG_COMPRESSED)
1436 {
1437 ctx->frag_compressed = 1;
1438 }
1439 else
1440 {
1441 ctx->frag_compressed = 0;
1442 }
1443 }
1444 memcpy(ctx->frag_buf + ctx->frag_buf_len,
1445 elem->payload, elem->payload_len);
1446 ctx->frag_buf_len += elem->payload_len;
1447 }
1448 else
1449 {
1450 ctx->frag_buf_len = 0;
1451 ctx->frag_compressed = 0;
1452 }
1453 }
1454 else if (ctx->frag_buf_len > 0)
1455 {
1456 /* 最終フラグメント: バッファに追記してコールバック */
1457 if (ctx->frag_buf_len + elem->payload_len <= ctx->global.max_message_size)
1458 {
1459 memcpy(ctx->frag_buf + ctx->frag_buf_len,
1460 elem->payload, elem->payload_len);
1461 ctx->frag_buf_len += elem->payload_len;
1462
1463 if (ctx->callback != NULL)
1464 {
1465 recv_deliver(ctx, ctx->frag_buf,
1466 ctx->frag_buf_len, ctx->frag_compressed);
1467 }
1468 }
1469 ctx->frag_buf_len = 0;
1470 ctx->frag_compressed = 0;
1471 }
1472 else
1473 {
1474 /* フラグメントなし: 直接コールバック */
1475 if (ctx->callback != NULL)
1476 {
1477 int is_compressed;
1478 if (elem->flags & POTR_FLAG_COMPRESSED)
1479 {
1480 is_compressed = 1;
1481 }
1482 else
1483 {
1484 is_compressed = 0;
1485 }
1486 recv_deliver(ctx,
1487 elem->payload,
1488 (size_t)elem->payload_len,
1489 is_compressed);
1490 }
1491 }
1492}
1493
1494/* recv_window から順序整列済みの外側パケットを取り出してペイロードエレメントを配信する。
1495 REJECT 処理後と通常受信処理の両方から呼び出す。 */
1496static void drain_recv_window(struct PotrContext_ *ctx)
1497{
1498 PotrPacket pop_pkt;
1499
1500 while (window_recv_pop(&ctx->recv_window, &pop_pkt) == POTR_SUCCESS)
1501 {
1503
1504 const char *pkt_type_str;
1505 if (pop_pkt.flags & POTR_FLAG_PING)
1506 {
1507 pkt_type_str = "PING";
1508 }
1509 else
1510 {
1511 pkt_type_str = "DATA";
1512 }
1514 "recv[service_id=%" PRId64 "]: pop seq=%u %s",
1515 ctx->service.service_id,
1516 (unsigned)pop_pkt.seq_num,
1517 pkt_type_str);
1518
1519 if (pop_pkt.flags & POTR_FLAG_PING)
1520 {
1521 continue; /* PING: 生存確認のみ、ペイロードエレメント展開不要 */
1522 }
1523
1524 /* DATA: ペイロードエレメントを順に展開して配信 */
1525 {
1526 size_t offset = 0;
1527 PotrPacket elem;
1528
1529 while (packet_unpack_next(&pop_pkt, &offset, &elem) == POTR_SUCCESS)
1530 {
1531 deliver_payload_elem(ctx, &elem);
1532 }
1533 }
1534 }
1535}
1536
1537/* RAW モード用: DISCONNECTED イベントを発行してセッション状態を部分的にリセットする。
1538 ウィンドウリセットは呼び出し元が行う (新しい基点通番が確定してから呼ぶため)。
1539 フラグメント組み立てバッファも破棄する。 */
1540static void raw_session_disconnect(struct PotrContext_ *ctx)
1541{
1542 if (ctx->health_alive)
1543 {
1544 ctx->health_alive = 0;
1546 "recv[service_id=%" PRId64 "]: RAW DISCONNECTED (gap detected)",
1547 ctx->service.service_id);
1548 if (ctx->callback != NULL)
1549 {
1551 POTR_EVENT_DISCONNECTED, NULL, 0);
1552 }
1553 }
1554
1555 /* フラグメント組み立て途中状態を破棄 */
1556 ctx->frag_buf_len = 0;
1557 ctx->frag_compressed = 0;
1558}
1559
1560/* 外側パケット (DATA / PING) を受信ウィンドウに投入し、順序整列済みパケットを配信する。
1561 再送・順序整列の単位は外側パケットであり、NACK も外側パケットの seq_num を指定する。
1562 RAW モードでは NACK を送信せず、ギャップ検出時は DISCONNECTED を発行してウィンドウを
1563 新しい基点通番でリセットする。 */
1564static void process_outer_pkt(struct PotrContext_ *ctx,
1565 const PotrPacket *pkt)
1566{
1567 uint32_t nack_num;
1568 uint32_t stretch;
1569 int is_raw = potr_is_raw_type(ctx->service.type);
1570
1571 if (window_recv_push(&ctx->recv_window, pkt) != POTR_SUCCESS)
1572 {
1573 if (is_raw)
1574 {
1575 /* ウィンドウ満杯: DISCONNECTED を発行し、受信したパケットの通番でリセットしてから
1576 再投入する。再投入は必ず成功する (空ウィンドウの先頭スロット)。 */
1578 "recv[service_id=%" PRId64 "]: RAW recv_window full, resetting to seq=%u",
1579 ctx->service.service_id, (unsigned)pkt->seq_num);
1582 if (window_recv_push(&ctx->recv_window, pkt) != POTR_SUCCESS)
1583 {
1584 /* リセット直後の再投入失敗は想定外 */
1586 "recv[service_id=%" PRId64 "]: RAW window re-push failed seq=%u (bug)",
1587 ctx->service.service_id, (unsigned)pkt->seq_num);
1588 return;
1589 }
1590 }
1591 else
1592 {
1593 /* 通番がウィンドウ範囲外のためドロップ (受信ウィンドウ満杯、または古い重複パケット)。
1594 受信者は next_seq を待ち続けるが、ヘルスチェックや後続パケット到着時に NACK が送られる。 */
1596 "recv[service_id=%" PRId64 "]: recv_window full (100%%), dropping seq=%u"
1597 " (base_seq=%u window_size=%u)",
1598 ctx->service.service_id, (unsigned)pkt->seq_num,
1599 (unsigned)ctx->recv_window.base_seq,
1600 (unsigned)ctx->recv_window.window_size);
1601 return;
1602 }
1603 }
1604
1605 /* ウィンドウ利用率チェック: 今回のパケットが占める先頭からの距離で推定する。
1606 stretch = pkt->seq_num - base_seq + 1 。push 成功後は [1, window_size] の範囲。 */
1607 stretch = (uint32_t)(pkt->seq_num - ctx->recv_window.base_seq) + 1U;
1608 if (stretch * 10U >= (uint32_t)ctx->recv_window.window_size * 8U)
1609 {
1611 "recv[service_id=%" PRId64 "]: recv_window utilization high (%u/%u >= 80%%)"
1612 " seq=%u base_seq=%u",
1613 ctx->service.service_id,
1614 (unsigned)stretch, (unsigned)ctx->recv_window.window_size,
1615 (unsigned)pkt->seq_num, (unsigned)ctx->recv_window.base_seq);
1616 }
1617
1618 if (window_recv_needs_nack(&ctx->recv_window, &nack_num))
1619 {
1620 if (is_raw)
1621 {
1622 /* ギャップ検出: リオーダー待機を確認してから DISCONNECTED を発行する。
1623 reorder_timeout_ms > 0 のとき、タイムアウト前はウィンドウに今のパケットを
1624 残したまま待機する。タイムアウト後または即時モードでは reset + 再 push。
1625 skip ループは push 済みパケットのスロットマッピングを壊すため使用しない。 */
1626 if (reorder_gap_ready(ctx, nack_num))
1627 {
1630 if (window_recv_push(&ctx->recv_window, pkt) != POTR_SUCCESS)
1631 {
1632 /* リセット直後の再投入失敗は想定外 */
1634 "recv[service_id=%" PRId64 "]: RAW gap re-push failed seq=%u (bug)",
1635 ctx->service.service_id, (unsigned)pkt->seq_num);
1636 return;
1637 }
1638 }
1639 /* else: リオーダー待機中。パケットはウィンドウに push 済み。 */
1640 }
1641 else
1642 {
1643 if (reorder_gap_ready(ctx, nack_num))
1644 {
1646 "recv[service_id=%" PRId64 "]: NACK seq=%u",
1647 ctx->service.service_id, (unsigned)nack_num);
1648 send_nack(ctx, nack_num);
1649 }
1650 /* else: リオーダー待機中。NACK 送出を保留。 */
1651 }
1652 }
1653 else
1654 {
1655 /* 欠番なし: 待機中の欠番が埋まった (または元から欠番なし) */
1656 ctx->reorder_pending = 0;
1657 }
1658
1659 drain_recv_window(ctx);
1660
1661 /* drain 後に next_seq が前進した結果、新たな欠番が先頭に現れる場合は NACK を送る。
1662 例: seq=3,4 欠落・seq=5 着時、drain 前は NACK(3)、seq=3 再送着→ drain で pop 後
1663 next_seq=4 が欠番になるが次のパケット到着まで NACK(4) が遅延するのを防ぐ。
1664 RAW モードで reorder_gap_ready が 1 を返した場合は reset + 再 push によりウィンドウに
1665 パケット 1 つのみ残るため drain 後は空となりここに到達しない。
1666 リオーダー待機中 (RAW/通常) は drain で前進しないため post-drain 欠番も保留のまま。 */
1667 if (window_recv_needs_nack(&ctx->recv_window, &nack_num))
1668 {
1669 if (!is_raw && reorder_gap_ready(ctx, nack_num))
1670 {
1672 "recv[service_id=%" PRId64 "]: NACK seq=%u (post-drain)",
1673 ctx->service.service_id, (unsigned)nack_num);
1674 send_nack(ctx, nack_num);
1675 }
1676 /* RAW モードでは reset + 再 push 後は到達しない。リオーダー待機中は
1677 check_reorder_timeout がタイムアウト処理を担う。 */
1678 }
1679 else
1680 {
1681 ctx->reorder_pending = 0;
1682 }
1683}
1684
1685/* 受信スレッド本体 */
1686#ifndef _WIN32
1687static void *recv_thread_func(void *arg)
1688#else /* _WIN32 */
1689static DWORD WINAPI recv_thread_func(LPVOID arg)
1690#endif /* _WIN32 */
1691{
1692 struct PotrContext_ *ctx = (struct PotrContext_ *)arg;
1693 uint8_t *buf = ctx->recv_buf; /* PACKET_HEADER_SIZE + max_payload バイト */
1694 PotrPacket pkt;
1695 struct sockaddr_in sender_addr;
1696 uint32_t poll_ms;
1697
1699 && ctx->global.health_timeout_ms > 0U)
1700 {
1701 poll_ms = ctx->global.health_timeout_ms / 3U;
1702 if (poll_ms < 100U) poll_ms = 100U;
1703 }
1704 else
1705 {
1706 poll_ms = 500U;
1707 }
1708 /* reorder_timeout_ms が有効な場合は poll 間隔を短縮してタイムアウト精度を確保する */
1710 && ctx->global.reorder_timeout_ms > 0U)
1711 {
1712 if (ctx->global.reorder_timeout_ms < poll_ms)
1713 {
1714 poll_ms = ctx->global.reorder_timeout_ms;
1715 }
1716 }
1717
1718 while (ctx->running[0])
1719 {
1720 fd_set readfds;
1721 struct timeval tv;
1722 int ret;
1723 int maxfd = -1;
1724 int i;
1725
1726 FD_ZERO(&readfds);
1727 for (i = 0; i < ctx->n_path; i++)
1728 {
1729 if (ctx->sock[i] == POTR_INVALID_SOCKET) continue;
1730#ifndef _WIN32
1731 FD_SET(ctx->sock[i], &readfds);
1732 if (ctx->sock[i] > maxfd) maxfd = ctx->sock[i];
1733#else /* _WIN32 */
1734 FD_SET(ctx->sock[i], &readfds);
1735 /* Windows では SOCKET は UINT_PTR。maxfd の代わりに n_path を使う */
1736#endif /* _WIN32 */
1737 }
1738
1739#ifdef _WIN32
1740 /* Windows: select の第1引数は無視されるが 0 でなく n_path を渡す */
1741 maxfd = ctx->n_path;
1742#endif /* _WIN32 */
1743
1744 if (maxfd < 0) break;
1745
1746 tv.tv_sec = (long)(poll_ms / 1000U);
1747 tv.tv_usec = (long)((poll_ms % 1000U) * 1000U);
1748
1749 ret = select(maxfd + 1, &readfds, NULL, NULL, &tv);
1750
1751 if (ret < 0)
1752 {
1753 if (!ctx->running[0]) break;
1754 continue;
1755 }
1756
1757 if (ret == 0)
1758 {
1759 if (ctx->is_multi_peer)
1760 {
1762 }
1763 else if (ctx->role == POTR_ROLE_RECEIVER
1765 {
1767 if (ctx->global.reorder_timeout_ms > 0U)
1768 {
1770 }
1771 }
1772 continue;
1773 }
1774
1775 for (i = 0; i < ctx->n_path; i++)
1776 {
1777 int recv_len;
1778#ifndef _WIN32
1779 socklen_t sender_len;
1780#else /* _WIN32 */
1781 int sender_len;
1782#endif /* _WIN32 */
1783
1784 if (ctx->sock[i] == POTR_INVALID_SOCKET) continue;
1785 if (!FD_ISSET(ctx->sock[i], &readfds)) continue;
1786
1787 memset(&sender_addr, 0, sizeof(sender_addr));
1788 sender_len = sizeof(sender_addr);
1789
1790#ifndef _WIN32
1791 recv_len = (int)recvfrom(ctx->sock[i], buf,
1793 0, (struct sockaddr *)&sender_addr,
1794 &sender_len);
1795#else /* _WIN32 */
1796 recv_len = recvfrom(ctx->sock[i], (char *)buf,
1798 0, (struct sockaddr *)&sender_addr, &sender_len);
1799#endif /* _WIN32 */
1800 if (recv_len <= 0)
1801 {
1802 if (!ctx->running[0]) break; /* 正常終了: ソケットクローズによる割り込み */
1804 "recv[service_id=%" PRId64 "]: recvfrom returned %d",
1805 ctx->service.service_id, recv_len);
1806 continue;
1807 }
1808
1809 if (packet_parse(&pkt, buf, (size_t)recv_len) != POTR_SUCCESS)
1810 {
1812 "recv[service_id=%" PRId64 "]: packet parse failed (len=%d)",
1813 ctx->service.service_id, recv_len);
1814 continue;
1815 }
1816 if (pkt.service_id != ctx->service.service_id)
1817 {
1819 "recv[service_id=%" PRId64 "]: ignored packet for service_id=%" PRId64 "",
1820 ctx->service.service_id, pkt.service_id);
1821 continue;
1822 }
1823 if (!check_src_addr(ctx, &sender_addr)) continue;
1824
1825 /* ── N:1 モード: ピアごとにディスパッチ ── */
1826 if (ctx->is_multi_peer)
1827 {
1828 PotrPeerContext *peer = NULL;
1829 int is_new_peer = 0;
1830
1832
1833 /* session_triplet でピアを検索 */
1834 peer = peer_find_by_session(ctx,
1835 pkt.session_id,
1836 pkt.session_tv_sec,
1837 pkt.session_tv_nsec);
1838
1839 if (peer == NULL)
1840 {
1841 /* 新規ピア: SYN (初回パケット) として扱い、ピアを作成する */
1842 if (!(pkt.flags & (POTR_FLAG_FIN | POTR_FLAG_NACK)))
1843 {
1844 peer = peer_create(ctx, &sender_addr, i);
1845 if (peer != NULL)
1846 {
1847 /* ピアのセッション情報を記録 */
1848 peer->peer_session_id = pkt.session_id;
1851 peer->peer_session_known = 1;
1852 window_init(&peer->recv_window, pkt.seq_num,
1853 ctx->global.window_size,
1854 ctx->global.max_payload);
1855 is_new_peer = 1;
1856 }
1857 }
1858 }
1859
1860 if (peer == NULL)
1861 {
1863 continue; /* max_peers 超過またはパース不可パケット */
1864 }
1865
1866 /* 新規ピア: CONNECTED コールバックを発火 */
1867 if (is_new_peer)
1868 {
1869 peer->health_alive = 1;
1871 "recv[service_id=%" PRId64 "]: CONNECTED peer=%u from %u.%u.%u.%u:%u",
1872 ctx->service.service_id, (unsigned)peer->peer_id,
1873 (unsigned)((ntohl(sender_addr.sin_addr.s_addr) >> 24) & 0xFF),
1874 (unsigned)((ntohl(sender_addr.sin_addr.s_addr) >> 16) & 0xFF),
1875 (unsigned)((ntohl(sender_addr.sin_addr.s_addr) >> 8) & 0xFF),
1876 (unsigned)( ntohl(sender_addr.sin_addr.s_addr) & 0xFF),
1877 (unsigned)ntohs(sender_addr.sin_port));
1878 if (ctx->callback != NULL)
1879 {
1880 ctx->callback(ctx->service.service_id, peer->peer_id,
1881 POTR_EVENT_CONNECTED, NULL, 0);
1882 }
1883 }
1884
1885 /* 暗号化パケットを復号 */
1888 {
1889 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
1890 size_t dec_len = ctx->crypto_buf_size;
1891 uint32_t sid_nbo = htonl(pkt.session_id);
1892 uint16_t flags_nbo = htons((uint16_t)pkt.flags);
1893 uint32_t seq_nbo = htonl(pkt.seq_num);
1894
1895 /* ノンス: session_id(4B) + flags(2B) + seq_num(4B) + padding(2B) */
1896 memcpy(nonce, &sid_nbo, 4);
1897 memcpy(nonce + 4, &flags_nbo, 2);
1898 memcpy(nonce + 6, &seq_nbo, 4);
1899 memset(nonce + 10, 0, 2);
1900
1901 if (potr_decrypt(ctx->crypto_buf, &dec_len,
1902 pkt.payload, pkt.payload_len,
1903 ctx->service.encrypt_key,
1904 nonce,
1905 buf, PACKET_HEADER_SIZE) != 0)
1906 {
1908 continue;
1909 }
1910
1911 pkt.payload = ctx->crypto_buf;
1912 pkt.payload_len = (uint16_t)dec_len;
1913 pkt.flags = (uint16_t)(pkt.flags & ~POTR_FLAG_ENCRYPTED);
1914 }
1915 else if (pkt.flags & POTR_FLAG_ENCRYPTED)
1916 {
1917 /* PING/NACK/REJECT/FIN の GCM 認証タグ検証 */
1919 {
1921 continue;
1922 }
1923 {
1924 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
1925 uint8_t dummy[1];
1926 size_t dummy_len = sizeof(dummy);
1927 uint32_t val;
1928 uint32_t sid_nbo = htonl(pkt.session_id);
1929 uint16_t flags_nbo = htons((uint16_t)pkt.flags);
1930 uint32_t val_nbo;
1931
1932 /* NACK/REJECT は ack_num、その他 (PING/FIN) は seq_num */
1933 val = (pkt.flags & (POTR_FLAG_NACK | POTR_FLAG_REJECT))
1934 ? pkt.ack_num : pkt.seq_num;
1935 val_nbo = htonl(val);
1936
1937 memcpy(nonce, &sid_nbo, 4);
1938 memcpy(nonce + 4, &flags_nbo, 2);
1939 memcpy(nonce + 6, &val_nbo, 4);
1940 memset(nonce + 10, 0, 2);
1941
1942 if (potr_decrypt(dummy, &dummy_len,
1944 ctx->service.encrypt_key,
1945 nonce,
1946 buf, PACKET_HEADER_SIZE) != 0)
1947 {
1949 continue;
1950 }
1951 pkt.flags = (uint16_t)(pkt.flags & ~POTR_FLAG_ENCRYPTED);
1952 pkt.payload_len = 0;
1953 pkt.payload = NULL;
1954 }
1955 }
1956
1957 /* FIN: ピアの正常終了通知 */
1958 if (pkt.flags & POTR_FLAG_FIN)
1959 {
1961 "recv[service_id=%" PRId64 "]: peer=%u FIN received -> DISCONNECTED",
1962 ctx->service.service_id, (unsigned)peer->peer_id);
1963
1964 if (peer->health_alive && ctx->callback != NULL)
1965 {
1966 peer->health_alive = 0;
1967 ctx->callback(ctx->service.service_id, peer->peer_id,
1968 POTR_EVENT_DISCONNECTED, NULL, 0);
1969 }
1970
1971 peer_free(ctx, peer);
1973 continue;
1974 }
1975
1976 /* NACK: 送信ウィンドウから再送する */
1977 if (pkt.flags & POTR_FLAG_NACK)
1978 {
1979 PotrPacket resend_pkt;
1980 size_t wire_len = 0;
1981 int get_result;
1982 int j;
1983
1984#ifndef _WIN32
1985 pthread_mutex_lock(&peer->send_window_mutex);
1986#else /* _WIN32 */
1987 EnterCriticalSection(&peer->send_window_mutex);
1988#endif /* _WIN32 */
1989 get_result = window_send_get(&peer->send_window,
1990 pkt.ack_num, &resend_pkt);
1991 if (get_result == POTR_SUCCESS)
1992 {
1993 wire_len = packet_wire_size(&resend_pkt);
1994 memcpy(ctx->recv_buf, &resend_pkt, PACKET_HEADER_SIZE);
1995 memcpy(ctx->recv_buf + PACKET_HEADER_SIZE,
1996 resend_pkt.payload,
1997 wire_len - PACKET_HEADER_SIZE);
1998 }
1999#ifndef _WIN32
2000 pthread_mutex_unlock(&peer->send_window_mutex);
2001#else /* _WIN32 */
2002 LeaveCriticalSection(&peer->send_window_mutex);
2003#endif /* _WIN32 */
2004
2005 if (get_result == POTR_SUCCESS)
2006 {
2008 "recv[service_id=%" PRId64 "]: peer=%u NACK seq=%u -> retransmit",
2009 ctx->service.service_id, (unsigned)peer->peer_id,
2010 (unsigned)pkt.ack_num);
2011 for (j = 0; j < (int)POTR_MAX_PATH; j++)
2012 {
2013 if (peer->dest_addr[j].sin_family == 0) continue;
2014#ifndef _WIN32
2015 sendto(ctx->sock[j], ctx->recv_buf, wire_len, 0,
2016 (const struct sockaddr *)&peer->dest_addr[j],
2017 sizeof(peer->dest_addr[j]));
2018#else /* _WIN32 */
2019 sendto(ctx->sock[j], (const char *)ctx->recv_buf,
2020 (int)wire_len, 0,
2021 (const struct sockaddr *)&peer->dest_addr[j],
2022 sizeof(peer->dest_addr[j]));
2023#endif /* _WIN32 */
2024 }
2025 }
2026 else
2027 {
2029 "recv[service_id=%" PRId64 "]: peer=%u NACK seq=%u not in window -> REJECT",
2030 ctx->service.service_id, (unsigned)peer->peer_id,
2031 (unsigned)pkt.ack_num);
2032 n1_send_reject(ctx, peer, pkt.ack_num);
2033 }
2034
2036 continue;
2037 }
2038
2039 /* REJECT */
2040 if (pkt.flags & POTR_FLAG_REJECT)
2041 {
2042 if (!n1_check_and_update_session(ctx, peer, &pkt))
2043 {
2045 continue;
2046 }
2047 n1_update_path_recv(peer, &sender_addr, i);
2048
2049 if (peer->health_alive && ctx->callback != NULL)
2050 {
2051 peer->health_alive = 0;
2052 ctx->callback(ctx->service.service_id, peer->peer_id,
2053 POTR_EVENT_DISCONNECTED, NULL, 0);
2054 }
2055
2056 peer->reorder_pending = 0;
2058 n1_drain_recv_window(ctx, peer);
2060 continue;
2061 }
2062
2063 /* DATA / PING */
2064 if (!(pkt.flags & (POTR_FLAG_DATA | POTR_FLAG_PING)))
2065 {
2067 continue;
2068 }
2069
2070 if (!n1_check_and_update_session(ctx, peer, &pkt))
2071 {
2073 continue;
2074 }
2075
2076 n1_update_path_recv(peer, &sender_addr, i);
2077
2079 "recv[service_id=%" PRId64 "]: peer=%u %s seq=%u",
2080 ctx->service.service_id, (unsigned)peer->peer_id,
2081 (pkt.flags & POTR_FLAG_PING) ? "PING" : "DATA",
2082 (unsigned)pkt.seq_num);
2083
2084 if (pkt.flags & POTR_FLAG_PING)
2085 {
2086 if (pkt.seq_num != peer->recv_window.next_seq
2088 peer->recv_window.next_seq + 1U,
2089 peer->recv_window.window_size))
2090 {
2091 if (n1_reorder_gap_ready(peer, peer->recv_window.next_seq))
2092 {
2093 n1_send_nack(ctx, peer, peer->recv_window.next_seq);
2094 }
2095 }
2096 else
2097 {
2098 n1_notify_health_alive(ctx, peer);
2099 n1_send_ping_reply(ctx, peer, pkt.seq_num);
2100 }
2102 continue;
2103 }
2104
2105 n1_process_outer_pkt(ctx, peer, &pkt);
2107 continue;
2108 }
2109
2110 /* 暗号化パケットを復号する
2111 * - POTR_FLAG_DATA | POTR_FLAG_ENCRYPTED の組み合わせ: ペイロードを復号
2112 * - POTR_FLAG_ENCRYPTED 単独 (PING/NACK/REJECT/FIN): GCM タグのみ検証
2113 * - 復号後 pkt.payload / pkt.payload_len を書き換えて以降の処理を透過させる
2114 * - 認証失敗 (タグ不一致) は即座に破棄する
2115 */
2118 {
2119 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
2120 size_t dec_len = ctx->crypto_buf_size;
2121 /* pkt.session_id / pkt.seq_num は packet_parse 後はホストオーダー */
2122 uint32_t sid_nbo = htonl(pkt.session_id);
2123 uint16_t flags_nbo = htons((uint16_t)pkt.flags);
2124 uint32_t seq_nbo = htonl(pkt.seq_num);
2125
2126 /* ノンス: session_id(4B NBO) + flags(2B NBO) + seq_num(4B NBO) + padding(2B) */
2127 memcpy(nonce, &sid_nbo, 4);
2128 memcpy(nonce + 4, &flags_nbo, 2);
2129 memcpy(nonce + 6, &seq_nbo, 4);
2130 memset(nonce + 10, 0, 2);
2131
2132 /* AAD = 受信 raw バイト先頭 32B (NBO、送信側と同一) */
2133 if (potr_decrypt(ctx->crypto_buf, &dec_len,
2134 pkt.payload, pkt.payload_len,
2135 ctx->service.encrypt_key,
2136 nonce,
2137 buf, PACKET_HEADER_SIZE) != 0)
2138 {
2140 "recv[service_id=%" PRId64 "]: decrypt failed (auth) seq=%u",
2141 ctx->service.service_id, (unsigned)pkt.seq_num);
2142 continue;
2143 }
2144
2145 pkt.payload = ctx->crypto_buf;
2146 pkt.payload_len = (uint16_t)dec_len;
2147 pkt.flags = (uint16_t)(pkt.flags & ~POTR_FLAG_ENCRYPTED);
2148 }
2149 else if (pkt.flags & POTR_FLAG_ENCRYPTED)
2150 {
2151 /* ペイロードなしパケット (PING/NACK/REJECT/FIN) の GCM タグ検証 */
2152 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
2153 uint8_t dummy[1];
2154 size_t dummy_len = sizeof(dummy);
2155 uint32_t val;
2156 uint32_t sid_nbo = htonl(pkt.session_id);
2157 uint16_t flags_nbo = htons((uint16_t)pkt.flags);
2158 uint32_t val_nbo;
2159
2161 {
2163 "recv[service_id=%" PRId64 "]: encrypted no-payload pkt bad len=%u",
2164 ctx->service.service_id, (unsigned)pkt.payload_len);
2165 continue;
2166 }
2167
2168 /* NACK/REJECT は ack_num、それ以外 (PING/FIN) は seq_num をノンスに使用 */
2169 val = (pkt.flags & (POTR_FLAG_NACK | POTR_FLAG_REJECT))
2170 ? pkt.ack_num : pkt.seq_num;
2171 val_nbo = htonl(val);
2172
2173 /* ノンス: session_id(4B NBO) + flags(2B NBO) + val(4B NBO) + padding(2B) */
2174 memcpy(nonce, &sid_nbo, 4);
2175 memcpy(nonce + 4, &flags_nbo, 2);
2176 memcpy(nonce + 6, &val_nbo, 4);
2177 memset(nonce + 10, 0, 2);
2178
2179 /* AAD = 受信 raw バイト先頭 32B (NBO、送信側と同一) */
2180 if (potr_decrypt(dummy, &dummy_len,
2182 ctx->service.encrypt_key,
2183 nonce,
2184 buf, PACKET_HEADER_SIZE) != 0)
2185 {
2187 "recv[service_id=%" PRId64 "]: tag verify failed flags=0x%04x",
2188 ctx->service.service_id, (unsigned)pkt.flags);
2189 continue;
2190 }
2191
2192 /* 検証成功: ENCRYPTED フラグを除去して後続処理へ */
2193 pkt.flags = (uint16_t)(pkt.flags & ~POTR_FLAG_ENCRYPTED);
2194 pkt.payload_len = 0;
2195 pkt.payload = NULL;
2196 }
2197
2198 /* ── 送信者ロール: NACK のみ処理 ── */
2199 if (ctx->role == POTR_ROLE_SENDER)
2200 {
2201 /* RAW モードは再送バッファを持たないため NACK を無視する */
2202 if (potr_is_raw_type(ctx->service.type))
2203 {
2204 continue;
2205 }
2206
2207 if (pkt.flags & POTR_FLAG_NACK)
2208 {
2209 int j;
2210
2211 /* 同一 ack_num の NACK が POTR_NACK_DEDUP_MS 以内に届いた場合は破棄 */
2212 {
2213 uint64_t now_ms = get_ms_mono();
2214 int dedup_idx;
2215 int is_dup = 0;
2216
2217 for (dedup_idx = 0;
2218 dedup_idx < (int)POTR_NACK_DEDUP_SLOTS;
2219 dedup_idx++)
2220 {
2221 const PotrNackDedupEntry *e =
2222 &ctx->nack_dedup_buf[dedup_idx];
2223 if (e->time_ms != 0
2224 && e->ack_num == pkt.ack_num
2225 && (now_ms - e->time_ms)
2226 < (uint64_t)POTR_NACK_DEDUP_MS)
2227 {
2228 is_dup = 1;
2229 break;
2230 }
2231 }
2232
2233 if (is_dup)
2234 {
2235 continue;
2236 }
2237
2239 pkt.ack_num;
2241 now_ms;
2242 ctx->nack_dedup_next =
2243 (uint8_t)((ctx->nack_dedup_next + 1U)
2245 }
2246
2247 {
2248 PotrPacket resend_pkt;
2249 int get_result;
2250 size_t wire_len = 0;
2251
2252 /* send_window へのアクセスを排他制御する (送信スレッド・ヘルスチェックスレッドと競合)。
2253 ミューテックス保持中に recv_buf へ wire データを組み立て、
2254 プールスロットが上書きされる前にコピーを完了させる。 */
2255#ifndef _WIN32
2256 pthread_mutex_lock(&ctx->send_window_mutex);
2257#else /* _WIN32 */
2258 EnterCriticalSection(&ctx->send_window_mutex);
2259#endif /* _WIN32 */
2260 get_result = window_send_get(&ctx->send_window,
2261 pkt.ack_num,
2262 &resend_pkt);
2263
2264 if (get_result == POTR_SUCCESS)
2265 {
2266 /* [NBO ヘッダー 32B][ペイロード] を recv_buf に組み立てる */
2267 wire_len = packet_wire_size(&resend_pkt);
2268 memcpy(ctx->recv_buf, &resend_pkt, PACKET_HEADER_SIZE);
2269 memcpy(ctx->recv_buf + PACKET_HEADER_SIZE,
2270 resend_pkt.payload,
2271 wire_len - PACKET_HEADER_SIZE);
2272 }
2273
2274#ifndef _WIN32
2275 pthread_mutex_unlock(&ctx->send_window_mutex);
2276#else /* _WIN32 */
2277 LeaveCriticalSection(&ctx->send_window_mutex);
2278#endif /* _WIN32 */
2279
2280 if (get_result == POTR_SUCCESS)
2281 {
2283 "sender[service_id=%" PRId64 "]: NACK received seq=%u"
2284 " -> retransmit",
2285 ctx->service.service_id,
2286 (unsigned)pkt.ack_num);
2287 for (j = 0; j < ctx->n_path; j++)
2288 {
2289#ifndef _WIN32
2290 sendto(ctx->sock[j], ctx->recv_buf, wire_len, 0,
2291 (const struct sockaddr *)&ctx->dest_addr[j],
2292 sizeof(ctx->dest_addr[j]));
2293#else /* _WIN32 */
2294 sendto(ctx->sock[j], (const char *)ctx->recv_buf,
2295 (int)wire_len, 0,
2296 (const struct sockaddr *)&ctx->dest_addr[j],
2297 sizeof(ctx->dest_addr[j]));
2298#endif /* _WIN32 */
2299 }
2300 }
2301 else
2302 {
2304 "sender[service_id=%" PRId64 "]: NACK seq=%u not in window"
2305 " -> REJECT",
2306 ctx->service.service_id,
2307 (unsigned)pkt.ack_num);
2308 send_reject(ctx, pkt.ack_num);
2309 }
2310 }
2311 }
2312 /* ACK・その他 (DATA/PING の反射など) は無視 */
2314 {
2315 continue;
2316 }
2317 /* UNICAST_BIDIR SENDER: フォールスルーして受信者処理 (FIN/REJECT/DATA/PING) へ */
2318 }
2319
2320 /* ── 受信者ロール: FIN / REJECT / DATA / PING を処理 ── */
2321
2322 /* FIN: 送信者からの正常終了通知 → 即座に DISCONNECTED へ遷移 */
2323 if (pkt.flags & POTR_FLAG_FIN)
2324 {
2325 if (!check_and_update_session(ctx, &pkt))
2326 {
2327 continue; /* 旧セッションの FIN → 無視 */
2328 }
2329
2331 "recv[service_id=%" PRId64 "]: FIN received -> DISCONNECTED",
2332 ctx->service.service_id);
2333
2334 /* health_alive で重複発火を防止する (ヘルスチェック有無によらず接続済み状態のみ) */
2335 if (ctx->health_alive)
2336 {
2337 ctx->health_alive = 0;
2338 if (ctx->callback != NULL)
2339 {
2341 POTR_EVENT_DISCONNECTED, NULL, 0);
2342 }
2343 }
2344
2345 /* セッション状態をリセットして次の接続を受け入れ可能にする。
2346 受信ウィンドウも初期化して前セッションの next_seq を残さない。 */
2347 ctx->peer_session_known = 0;
2348 ctx->reorder_pending = 0;
2349 ctx->last_recv_tv_sec = 0;
2350 window_init(&ctx->recv_window, 0,
2352 continue;
2353 }
2354
2355 /* REJECT: 欠落外側パケットをスキップして後続パケットを配信する */
2356 if (pkt.flags & POTR_FLAG_REJECT)
2357 {
2358 /* RAW モードは REJECT を発生させない (念のため無視) */
2359 if (potr_is_raw_type(ctx->service.type))
2360 {
2361 continue;
2362 }
2363
2364 if (!check_and_update_session(ctx, &pkt))
2365 {
2366 continue;
2367 }
2368
2369 /* 送信元から受信できている = 生存確認としてタイムアウトをリセットする */
2370 update_path_recv(ctx, i, &sender_addr);
2371
2373 "recv[service_id=%" PRId64 "]: REJECT received seq=%u"
2374 " (packet unrecoverable)",
2375 ctx->service.service_id, (unsigned)pkt.ack_num);
2376
2377 /* DISCONNECTED イベント発火 (接続済みのときのみ。連続 REJECT で重複しない) */
2378 if (ctx->health_alive)
2379 {
2380 ctx->health_alive = 0;
2381 if (ctx->callback != NULL)
2382 {
2384 POTR_EVENT_DISCONNECTED, NULL, 0);
2385 }
2386 }
2387
2388 /* 欠落外側パケットをスキップして recv_window を前進させる */
2389 ctx->reorder_pending = 0;
2391
2392 /* 後続パケットを配信 (ウィンドウに溜まっていれば最初の pop で CONNECTED を発火) */
2393 drain_recv_window(ctx);
2394 continue;
2395 }
2396
2397 /* NACK / ACK は受信者では無視 */
2398 if (!(pkt.flags & (POTR_FLAG_DATA | POTR_FLAG_PING)))
2399 {
2400 continue;
2401 }
2402
2403 /* セッション照合 */
2404 if (!check_and_update_session(ctx, &pkt))
2405 {
2406 continue;
2407 }
2408
2409 /* 受信時刻とパスポートを更新 (健全性タイムアウト計算用) */
2410 update_path_recv(ctx, i, &sender_addr);
2411
2412 {
2413 const char *pkt_kind_str;
2414 if (pkt.flags & POTR_FLAG_PING)
2415 {
2416 pkt_kind_str = "PING";
2417 }
2418 else
2419 {
2420 pkt_kind_str = "DATA";
2421 }
2423 "recv[service_id=%" PRId64 "]: %s seq=%u path=%d",
2424 ctx->service.service_id,
2425 pkt_kind_str,
2426 (unsigned)pkt.seq_num, i);
2427 }
2428
2429 if (pkt.flags & POTR_FLAG_PING)
2430 {
2431 /* PING はウィンドウ外: NACK・再送の対象外。
2432 seq_num は送信側の next_seq (消費前) を示す。
2433 通常モード: [recv_window.next_seq, seq_num) の範囲を全スキャンして欠番を一括 NACK する。
2434 RAW モード: ギャップがあれば DISCONNECTED を発行してウィンドウを新基点にリセットする。 */
2435 if (potr_is_raw_type(ctx->service.type))
2436 {
2437 /* pkt.seq_num が next_seq より前方にある (= ギャップあり) か判定する。
2438 seqnum_in_window で window_size 以内の前方範囲のみを対象とし、
2439 古い PING (next_seq より後方) は無視する。
2440 reorder_timeout_ms > 0 のとき、タイムアウト前は DISCONNECTED を保留する。 */
2441 if (pkt.seq_num != ctx->recv_window.next_seq
2443 ctx->recv_window.next_seq + 1U,
2445 {
2446 if (reorder_gap_ready(ctx, ctx->recv_window.next_seq))
2447 {
2450 }
2451 /* else: リオーダー待機中。次の PING/DATA 到着時に再判定する。 */
2452 }
2453 else
2454 {
2455 ctx->reorder_pending = 0; /* ギャップなし */
2456 }
2458 }
2459 else
2460 {
2462
2463 if (ctx->service.type == POTR_TYPE_UNICAST_BIDIR && pkt.ack_num != 0)
2464 {
2465 /* PING 応答受信: ギャップスキャン不要 */
2466 }
2467 else
2468 {
2469 {
2470 /* 先頭欠番のリオーダー待機を確認してから NACK スキャンを行う。
2471 最初の欠番が待機中の場合はその後続の欠番も一括保留する。
2472 スキャン完了 (全有効または全 NACK 送出済み) 時にリオーダー状態をクリアする。 */
2473 uint32_t scan_seq = ctx->recv_window.next_seq;
2474 while (scan_seq != pkt.seq_num
2475 && seqnum_in_window(scan_seq, ctx->recv_window.base_seq,
2477 {
2478 uint16_t idx = (uint16_t)((scan_seq - ctx->recv_window.base_seq)
2479 % ctx->recv_window.window_size);
2480 if (!ctx->recv_window.valid[idx])
2481 {
2482 if (reorder_gap_ready(ctx, scan_seq))
2483 {
2485 "recv[service_id=%" PRId64 "]: NACK seq=%u (PING gap scan)",
2486 ctx->service.service_id, (unsigned)scan_seq);
2487 send_nack(ctx, scan_seq);
2488 }
2489 else
2490 {
2491 break; /* リオーダー待機中: 後続欠番も保留 */
2492 }
2493 }
2494 scan_seq++;
2495 }
2496 /* スキャン完了: リオーダー状態をクリア */
2497 if (scan_seq == pkt.seq_num)
2498 {
2499 ctx->reorder_pending = 0;
2500 }
2501 }
2502
2503 /* UNICAST_BIDIR: PING 要求に対して PING 応答を返す */
2505 {
2506 send_ping_reply(ctx, pkt.seq_num);
2507 }
2508 }
2509 }
2510 }
2511 else
2512 {
2513 /* DATA: ウィンドウ経由で順序整列・配信 (RAW モードも同じ経路。
2514 RAW モードではギャップ検出時に NACK の代わりに DISCONNECTED を発行する)。 */
2515 process_outer_pkt(ctx, &pkt);
2516 }
2517 }
2518 }
2519
2520#ifndef _WIN32
2521 return NULL;
2522#else /* _WIN32 */
2523 return 0;
2524#endif /* _WIN32 */
2525}
2526
2527/* ================================================================
2528 * TCP ストリーム受信スレッド
2529 * ================================================================ */
2530
2531/* TCP ソケットが読み取り可能になるまで最大 wait_ms ミリ秒待機する。
2532 * 戻り値: 1 = データあり、0 = タイムアウト、-1 = エラー。 */
2533static int tcp_wait_readable(PotrSocket fd, uint32_t wait_ms)
2534{
2535 int ret;
2536#ifndef _WIN32
2537 fd_set rfds;
2538 struct timeval tv;
2539 FD_ZERO(&rfds);
2540 FD_SET(fd, &rfds);
2541 tv.tv_sec = (time_t)(wait_ms / 1000U);
2542 tv.tv_usec = (suseconds_t)((wait_ms % 1000U) * 1000U);
2543 ret = select(fd + 1, &rfds, NULL, NULL, &tv);
2544 if (ret < 0 && errno == EINTR) return 0; /* シグナル割り込み: タイムアウトとして扱う */
2545 if (ret < 0) return -1;
2546#else /* _WIN32 */
2547 fd_set rfds;
2548 struct timeval tv;
2549 FD_ZERO(&rfds);
2550 FD_SET(fd, &rfds);
2551 tv.tv_sec = (long)(wait_ms / 1000U);
2552 tv.tv_usec = (long)((wait_ms % 1000U) * 1000U);
2553 ret = select(0, &rfds, NULL, NULL, &tv);
2554 if (ret == SOCKET_ERROR) return -1;
2555#endif /* _WIN32 */
2556 return (ret > 0) ? 1 : 0;
2557}
2558
2559/* TCP ソケットから正確に n バイト読み取る。
2560 * 戻り値: 1 = 成功、0 = 切断 (recv が 0 を返した)、-1 = エラー。 */
2561static int tcp_read_all(PotrSocket fd, uint8_t *buf, size_t n)
2562{
2563 size_t received = 0;
2564 while (received < n)
2565 {
2566 int r;
2567#ifndef _WIN32
2568 r = (int)recv(fd, buf + received, n - received, 0);
2569 if (r < 0) return -1;
2570 if (r == 0) return 0;
2571#else /* _WIN32 */
2572 r = recv(fd, (char *)(buf + received), (int)(n - received), 0);
2573 if (r == SOCKET_ERROR) return -1;
2574 if (r == 0) return 0;
2575#endif /* _WIN32 */
2576 received += (size_t)r;
2577 }
2578 return 1;
2579}
2580
2581/* TCP 受信スレッドに渡す引数 (path ごと) */
2582typedef struct
2583{
2586 int _pad;
2587} TcpRecvArg;
2588
2590
2591/* TCP 用 CONNECTED イベント: tcp_state_mutex で二重発火を防止する。
2592 複数 path のスレッドが並行して呼び出し得るため mutex 保護が必要。 */
2593static void notify_connected_tcp(struct PotrContext_ *ctx)
2594{
2596 if (!ctx->health_alive)
2597 {
2598 ctx->health_alive = 1;
2601 "tcp_recv[service_id=%" PRId64 "]: CONNECTED",
2602 ctx->service.service_id);
2603 if (ctx->callback != NULL)
2604 {
2606 POTR_EVENT_CONNECTED, NULL, 0);
2607 }
2608 }
2609 else
2610 {
2612 }
2613}
2614
2615/* TCP ソケットに n バイト書き込む (tcp_send_mutex は呼び出し元が保持済み)。
2616 * 戻り値: 0 = 成功、-1 = エラー。 */
2617static int tcp_send_all_raw(PotrSocket fd, const uint8_t *buf, size_t n)
2618{
2619 size_t sent = 0;
2620 while (sent < n)
2621 {
2622 int r;
2623#ifndef _WIN32
2624 r = (int)send(fd, buf + sent, n - sent, 0);
2625 if (r <= 0) return -1;
2626#else /* _WIN32 */
2627 r = send(fd, (const char *)(buf + sent), (int)(n - sent), 0);
2628 if (r == SOCKET_ERROR) return -1;
2629#endif /* _WIN32 */
2630 sent += (size_t)r;
2631 }
2632 return 0;
2633}
2634
2635/* TCP 上で PING 応答パケットを送信する。tcp_send_mutex[path_idx] で送信を保護する。
2636 * req_seq_num: 受信した PING 要求の seq_num (ホストオーダー)。 */
2637static void tcp_send_ping_reply(struct PotrContext_ *ctx, int path_idx,
2638 uint32_t req_seq_num)
2639{
2640 PotrPacket reply_pkt;
2642 uint32_t my_next_seq;
2643
2644 shdr.service_id = ctx->service.service_id;
2645 shdr.session_id = ctx->session_id;
2646 shdr.session_tv_sec = ctx->session_tv_sec;
2647 shdr.session_tv_nsec = ctx->session_tv_nsec;
2648
2650 my_next_seq = ctx->send_window.next_seq;
2652
2653 /* ack_num=0 は PING 要求と区別できないため req_seq_num+1 を格納する */
2654 if (packet_build_ping(&reply_pkt, &shdr,
2655 my_next_seq, req_seq_num + 1U) != POTR_SUCCESS)
2656 {
2657 return;
2658 }
2659
2660 POTR_MUTEX_LOCK_LOCAL(&ctx->tcp_send_mutex[path_idx]);
2661
2662 if (ctx->tcp_conn_fd[path_idx] == POTR_INVALID_SOCKET)
2663 {
2664 POTR_MUTEX_UNLOCK_LOCAL(&ctx->tcp_send_mutex[path_idx]);
2665 return;
2666 }
2667
2668 if (ctx->service.encrypt_enabled)
2669 {
2670 uint8_t wire_buf[PACKET_HEADER_SIZE + POTR_CRYPTO_TAG_SIZE];
2671 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
2672 size_t enc_out = POTR_CRYPTO_TAG_SIZE;
2673
2674 reply_pkt.flags |= htons(POTR_FLAG_ENCRYPTED);
2675 reply_pkt.payload_len = htons((uint16_t)POTR_CRYPTO_TAG_SIZE);
2676
2677 /* ノンス: session_id(4B NBO) + flags(2B NBO) + seq_num(4B NBO) + padding(2B) */
2678 memcpy(nonce, &reply_pkt.session_id, 4);
2679 memcpy(nonce + 4, &reply_pkt.flags, 2);
2680 memcpy(nonce + 6, &reply_pkt.seq_num, 4);
2681 memset(nonce + 10, 0, 2);
2682
2683 memcpy(wire_buf, &reply_pkt, PACKET_HEADER_SIZE);
2684 if (potr_encrypt(wire_buf + PACKET_HEADER_SIZE, &enc_out,
2685 NULL, 0,
2686 ctx->service.encrypt_key,
2687 nonce,
2688 wire_buf, PACKET_HEADER_SIZE) == 0)
2689 {
2690 (void)tcp_send_all_raw(ctx->tcp_conn_fd[path_idx],
2691 wire_buf, PACKET_HEADER_SIZE + enc_out);
2692 }
2693 }
2694 else
2695 {
2696 uint8_t wire_buf[PACKET_HEADER_SIZE];
2697 memcpy(wire_buf, &reply_pkt, PACKET_HEADER_SIZE);
2698 (void)tcp_send_all_raw(ctx->tcp_conn_fd[path_idx], wire_buf, PACKET_HEADER_SIZE);
2699 }
2700
2701 POTR_MUTEX_UNLOCK_LOCAL(&ctx->tcp_send_mutex[path_idx]);
2702}
2703
2704/* TCP ストリーム受信スレッド本体 (path ごと) */
2705#ifndef _WIN32
2706static void *tcp_recv_thread_func(void *arg)
2707#else /* _WIN32 */
2708static DWORD WINAPI tcp_recv_thread_func(LPVOID arg)
2709#endif /* _WIN32 */
2710{
2711 TcpRecvArg *rarg = (TcpRecvArg *)arg;
2712 struct PotrContext_ *ctx = rarg->ctx;
2713 int path_idx = rarg->path_idx;
2714 uint8_t *buf = ctx->recv_buf; /* PACKET_HEADER_SIZE + max_payload バイト */
2715 PotrSocket fd;
2716
2717 /* PING 要求到着タイムアウト監視を使用するか判定する。
2718 * - tcp RECEIVER: SENDER が PING を周期送信するため監視が必要。
2719 * - tcp_bidir SENDER: RECEIVER も PING を周期送信するため、SENDER 側でも監視が必要。
2720 * - tcp_bidir RECEIVER: 同上。
2721 * - tcp SENDER: RECEIVER は PING を送信しないため監視不要。 */
2722 int use_ping_timeout = ((ctx->role == POTR_ROLE_RECEIVER
2723 || ctx->service.type == POTR_TYPE_TCP_BIDIR)
2724 && ctx->global.health_timeout_ms > 0);
2725 /* ポーリング間隔: 1 秒単位でチェックし、health_timeout_ms を超えないようにする。 */
2726 uint32_t poll_ms = use_ping_timeout
2727 ? (ctx->global.health_timeout_ms < 1000U
2729 : 1000U)
2730 : 0U;
2731
2733 "tcp_recv[service_id=%" PRId64 " path=%d]: starting (ping_req_timeout=%s)",
2734 ctx->service.service_id, path_idx,
2735 use_ping_timeout ? "enabled" : "disabled");
2736
2737 while (ctx->running[path_idx])
2738 {
2739 PotrPacket pkt;
2740 uint16_t wire_payload_len;
2741 int r;
2742
2743 fd = ctx->tcp_conn_fd[path_idx];
2744 if (fd == POTR_INVALID_SOCKET)
2745 {
2746 break;
2747 }
2748
2749 /* ── 先読みバッファ処理 ──
2750 * accept スレッドが session 判定のために読み取ったパケットが残っている場合、
2751 * ソケットからの読み取りをスキップしてバッファの内容をそのまま使用する。
2752 * accept スレッドは recv スレッド起動前に書き込みを完了しているため mutex 不要。 */
2753 if (ctx->tcp_first_pkt_len[path_idx] > 0)
2754 {
2755 /* accept スレッドの先読みバッファを recv バッファにコピーする */
2756 size_t first_len = ctx->tcp_first_pkt_len[path_idx];
2757 memcpy(buf, ctx->tcp_first_pkt_buf[path_idx], first_len);
2758 ctx->tcp_first_pkt_len[path_idx] = 0; /* 先読みバッファをクリア */
2759 {
2760 uint16_t wpl;
2761 memcpy(&wpl, buf + 30, sizeof(wpl));
2762 wire_payload_len = ntohs(wpl);
2763 }
2764 }
2765 else
2766 {
2767 /* RECEIVER: タイムアウト付きポーリングで PING 要求到着を監視する。
2768 * データが届くまで poll_ms 待機し、タイムアウト時は PING 到着時刻を確認する。 */
2769 if (use_ping_timeout)
2770 {
2771 int readable = tcp_wait_readable(fd, poll_ms);
2772 if (!ctx->running[path_idx]) break;
2773 if (readable < 0) break; /* エラー */
2774 if (readable == 0)
2775 {
2776 /* ポーリングタイムアウト: PING 要求到着時刻を確認する */
2777 uint64_t last = ctx->tcp_last_ping_req_recv_ms[path_idx];
2778 uint64_t elapsed = get_ms() - last;
2779 if (last > 0 && elapsed > (uint64_t)ctx->global.health_timeout_ms)
2780 {
2782 "tcp_recv[service_id=%" PRId64 " path=%d]: PING req timeout"
2783 " (%llu ms), disconnecting",
2784 ctx->service.service_id, path_idx,
2785 (unsigned long long)elapsed);
2786 break;
2787 }
2788 continue;
2789 }
2790 /* readable == 1: データあり。以降の tcp_read_all へ進む。 */
2791 }
2792
2793 /* 1. ヘッダー 32B 読み取り */
2794 r = tcp_read_all(fd, buf, PACKET_HEADER_SIZE);
2795 if (r <= 0)
2796 {
2797 break; /* 切断 or エラー */
2798 }
2799
2800 /* 2. ペイロード長を wire バイト列から抽出 (offset 30、NBO) */
2801 {
2802 uint16_t wpl;
2803 memcpy(&wpl, buf + 30, sizeof(wpl));
2804 wire_payload_len = ntohs(wpl);
2805 }
2806
2807 /* 3. ペイロード長バリデーション */
2808 if ((size_t)wire_payload_len > ctx->global.max_payload)
2809 {
2811 "tcp_recv[service_id=%" PRId64 "]: oversized payload %u > max %u,"
2812 " disconnecting",
2813 ctx->service.service_id,
2814 (unsigned)wire_payload_len,
2815 (unsigned)ctx->global.max_payload);
2816 break;
2817 }
2818
2819 /* 4. ペイロード読み取り */
2820 if (wire_payload_len > 0)
2821 {
2822 r = tcp_read_all(fd, buf + PACKET_HEADER_SIZE, wire_payload_len);
2823 if (r <= 0)
2824 {
2825 break;
2826 }
2827 }
2828 } /* else (先読みバッファなし) ここまで */
2829
2830 /* 5. パケット解析 */
2831 if (packet_parse(&pkt, buf,
2832 PACKET_HEADER_SIZE + wire_payload_len) != POTR_SUCCESS)
2833 {
2835 "tcp_recv[service_id=%" PRId64 "]: packet_parse failed",
2836 ctx->service.service_id);
2837 break;
2838 }
2839
2840 /* 6. service_id チェック */
2841 if (pkt.service_id != ctx->service.service_id)
2842 {
2844 "tcp_recv[service_id=%" PRId64 "]: service_id mismatch (%" PRId64 ")",
2845 ctx->service.service_id, pkt.service_id);
2846 continue;
2847 }
2848
2849 /* 7. DATA 暗号化復号 (DATA+ENCRYPTED の組み合わせのみ) */
2852 {
2853 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
2854 size_t dec_len = ctx->crypto_buf_size;
2855 uint32_t sid_nbo = htonl(pkt.session_id);
2856 uint16_t flg_nbo = htons((uint16_t)pkt.flags);
2857 uint32_t seq_nbo = htonl(pkt.seq_num);
2858
2859 memcpy(nonce, &sid_nbo, 4);
2860 memcpy(nonce + 4, &flg_nbo, 2);
2861 memcpy(nonce + 6, &seq_nbo, 4);
2862 memset(nonce + 10, 0, 2);
2863
2864 if (potr_decrypt(ctx->crypto_buf, &dec_len,
2865 pkt.payload, pkt.payload_len,
2866 ctx->service.encrypt_key,
2867 nonce,
2868 buf, PACKET_HEADER_SIZE) != 0)
2869 {
2871 "tcp_recv[service_id=%" PRId64 "]: decrypt failed seq=%u",
2872 ctx->service.service_id, (unsigned)pkt.seq_num);
2873 continue;
2874 }
2875
2876 pkt.payload = ctx->crypto_buf;
2877 pkt.payload_len = (uint16_t)dec_len;
2878 }
2879
2880 /* 8. パケット種別処理 */
2881 if (pkt.flags & POTR_FLAG_PING)
2882 {
2883 if (pkt.ack_num == 0U)
2884 {
2885 /* PING 要求: 即応答を返す。最終受信時刻を更新する。 */
2887 "tcp_recv[service_id=%" PRId64 " path=%d]: PING req seq=%u -> reply",
2888 ctx->service.service_id, path_idx, (unsigned)pkt.seq_num);
2890 ctx->tcp_last_ping_req_recv_ms[path_idx] = get_ms();
2891 tcp_send_ping_reply(ctx, path_idx, pkt.seq_num);
2892 }
2893 else
2894 {
2895 /* PING 応答: 最終受信時刻を更新 */
2896 ctx->tcp_last_ping_recv_ms[path_idx] = get_ms();
2899 "tcp_recv[service_id=%" PRId64 " path=%d]: PING resp seq=%u ack=%u",
2900 ctx->service.service_id, path_idx,
2901 (unsigned)pkt.seq_num, (unsigned)pkt.ack_num);
2902 }
2903 }
2904 else if (pkt.flags & POTR_FLAG_DATA)
2905 {
2906 /* セッション照合 + recv_window への投入 (recv_window_mutex で保護) */
2907 {
2908 int pushed;
2909
2911 if (!check_and_update_session(ctx, &pkt))
2912 {
2915 "tcp_recv[service_id=%" PRId64 " path=%d]: DATA session mismatch, ignored",
2916 ctx->service.service_id, path_idx);
2917 continue;
2918 }
2919 pushed = window_recv_push(&ctx->recv_window, &pkt);
2921
2922 if (pushed != POTR_SUCCESS)
2923 {
2924 /* 重複パケット → スキップ */
2926 "tcp_recv[service_id=%" PRId64 " path=%d]: DATA seq=%u duplicate, skipped",
2927 ctx->service.service_id, path_idx, (unsigned)pkt.seq_num);
2928 continue;
2929 }
2930
2932
2934 "tcp_recv[service_id=%" PRId64 " path=%d]: DATA seq=%u payload=%u",
2935 ctx->service.service_id, path_idx,
2936 (unsigned)pkt.seq_num, (unsigned)pkt.payload_len);
2937
2938 /* 順序整列済みパケットをポップして配信 */
2940 {
2941 PotrPacket out;
2942 while (window_recv_pop(&ctx->recv_window, &out) == POTR_SUCCESS)
2943 {
2944 size_t offset = 0;
2945 PotrPacket elem;
2947 while (packet_unpack_next(&out, &offset, &elem) == POTR_SUCCESS)
2948 {
2949 deliver_payload_elem(ctx, &elem);
2950 }
2952 }
2953 }
2955 }
2956 }
2957 }
2958
2959 /* 接続断処理: DISCONNECTED イベントは connect スレッドが tcp_active_paths == 0 時に発火する */
2960 ctx->running[path_idx] = 0;
2961
2963 "tcp_recv[service_id=%" PRId64 " path=%d]: exited",
2964 ctx->service.service_id, path_idx);
2965
2966#ifndef _WIN32
2967 return NULL;
2968#else /* _WIN32 */
2969 return 0;
2970#endif /* _WIN32 */
2971}
2972
2981{
2982 if (ctx == NULL)
2983 {
2984 return POTR_ERROR;
2985 }
2986
2987 ctx->running[0] = 1;
2988
2990 "recv_thread[service_id=%" PRId64 "]: starting",
2991 ctx->service.service_id);
2992
2993#ifndef _WIN32
2994 {
2995 int rc = pthread_create(&ctx->recv_thread[0], NULL, recv_thread_func, ctx);
2996 if (rc != 0)
2997 {
2998 ctx->running[0] = 0;
3000 "recv_thread[service_id=%" PRId64 "]: pthread_create failed",
3001 ctx->service.service_id);
3002 return POTR_ERROR;
3003 }
3004 }
3005#else /* _WIN32 */
3006 ctx->recv_thread[0] = CreateThread(NULL, 0, recv_thread_func, ctx, 0, NULL);
3007 if (ctx->recv_thread[0] == NULL)
3008 {
3009 ctx->running[0] = 0;
3011 "recv_thread[service_id=%" PRId64 "]: CreateThread failed",
3012 ctx->service.service_id);
3013 return POTR_ERROR;
3014 }
3015#endif /* _WIN32 */
3016
3017 return POTR_SUCCESS;
3018}
3019
3028{
3029 if (ctx == NULL)
3030 {
3031 return POTR_ERROR;
3032 }
3033
3034 ctx->running[0] = 0;
3035
3036#ifndef _WIN32
3037 {
3038 int i;
3039 for (i = 0; i < ctx->n_path; i++)
3040 {
3041 if (ctx->sock[i] != POTR_INVALID_SOCKET)
3042 {
3043 shutdown(ctx->sock[i], SHUT_RD);
3044 }
3045 }
3046 pthread_join(ctx->recv_thread[0], NULL);
3047 }
3048#else /* _WIN32 */
3049 {
3050 int i;
3051 for (i = 0; i < ctx->n_path; i++)
3052 {
3053 if (ctx->sock[i] != POTR_INVALID_SOCKET)
3054 {
3055 closesocket(ctx->sock[i]);
3056 ctx->sock[i] = POTR_INVALID_SOCKET;
3057 }
3058 }
3059 if (ctx->recv_thread[0] != NULL)
3060 {
3061 WaitForSingleObject(ctx->recv_thread[0], 3000);
3062 CloseHandle(ctx->recv_thread[0]);
3063 ctx->recv_thread[0] = NULL;
3064 }
3065 }
3066#endif /* _WIN32 */
3067
3068 return POTR_SUCCESS;
3069}
3070
3079int tcp_recv_thread_start(struct PotrContext_ *ctx, int path_idx)
3080{
3081 if (ctx == NULL) { return POTR_ERROR; }
3082
3083 ctx->running[path_idx] = 1;
3084
3085 s_tcp_recv_args[path_idx].ctx = ctx;
3086 s_tcp_recv_args[path_idx].path_idx = path_idx;
3087
3089 "tcp_recv_thread[service_id=%" PRId64 " path=%d]: starting",
3090 ctx->service.service_id, path_idx);
3091
3092#ifndef _WIN32
3093 {
3094 int rc = pthread_create(&ctx->recv_thread[path_idx], NULL,
3096 &s_tcp_recv_args[path_idx]);
3097 if (rc != 0)
3098 {
3099 ctx->running[path_idx] = 0;
3101 "tcp_recv_thread[service_id=%" PRId64 " path=%d]: pthread_create failed",
3102 ctx->service.service_id, path_idx);
3103 return POTR_ERROR;
3104 }
3105 }
3106#else /* _WIN32 */
3107 ctx->recv_thread[path_idx] = CreateThread(NULL, 0,
3109 &s_tcp_recv_args[path_idx], 0, NULL);
3110 if (ctx->recv_thread[path_idx] == NULL)
3111 {
3112 ctx->running[path_idx] = 0;
3114 "tcp_recv_thread[service_id=%" PRId64 " path=%d]: CreateThread failed",
3115 ctx->service.service_id, path_idx);
3116 return POTR_ERROR;
3117 }
3118#endif /* _WIN32 */
3119
3120 return POTR_SUCCESS;
3121}
3122
3133int tcp_recv_thread_stop(struct PotrContext_ *ctx, int path_idx)
3134{
3135 if (ctx == NULL) { return POTR_ERROR; }
3136
3137 ctx->running[path_idx] = 0;
3138
3139#ifndef _WIN32
3140 pthread_join(ctx->recv_thread[path_idx], NULL);
3141#else /* _WIN32 */
3142 if (ctx->recv_thread[path_idx] != NULL)
3143 {
3144 WaitForSingleObject(ctx->recv_thread[path_idx], INFINITE);
3145 CloseHandle(ctx->recv_thread[path_idx]);
3146 ctx->recv_thread[path_idx] = NULL;
3147 }
3148#endif /* _WIN32 */
3149
3150 return POTR_SUCCESS;
3151}
データ圧縮・解凍モジュールの内部ヘッダー。
int potr_decompress(uint8_t *dst, size_t *dst_len, const uint8_t *src, size_t src_len)
圧縮データを解凍します。
データ暗号化・復号モジュールの内部ヘッダー。
int potr_encrypt(uint8_t *dst, size_t *dst_len, const uint8_t *src, size_t src_len, const uint8_t *key, const uint8_t *nonce, const uint8_t *aad, size_t aad_len)
AES-256-GCM でデータを暗号化します。
int potr_decrypt(uint8_t *dst, size_t *dst_len, const uint8_t *src, size_t src_len, const uint8_t *key, const uint8_t *nonce, const uint8_t *aad, size_t aad_len)
AES-256-GCM でデータを復号し、認証タグを検証します。
#define POTR_CRYPTO_NONCE_SIZE
AES-256-GCM ノンスサイズ (バイト)。session_id (4B NBO) + flags (2B NBO) + seq_or_ack_num (4B NBO) + padding (2B...
#define POTR_CRYPTO_TAG_SIZE
AES-256-GCM 認証タグサイズ (バイト)。暗号文の直後に付加する。
#define POTR_FLAG_MORE_FRAG
後続フラグメントが存在することを示すペイロードエレメントフラグ。メッセージが複数ペイロードエレメントに分割された場合、最終フラグメント以外に設定する。
#define POTR_FLAG_COMPRESSED
ペイロードが圧縮されていることを示すペイロードエレメントフラグ。圧縮はメッセージ単位で行い、全フラグメントのペイロードエレメントに設定する。先頭 4 バイトが元サイズ (NBO)、続くデータが raw ...
#define POTR_MAX_PATH
マルチパスの最大パス数。
#define POTR_FLAG_REJECT
再送不能通知パケットであることを示すフラグ。ack_num に再送不能な通番を格納する。
#define POTR_FLAG_PING
ヘルスチェック要求パケットであることを示すフラグ。ペイロードなし。
#define POTR_FLAG_DATA
データパケット (パックコンテナ) であることを示すフラグ。常に設定される。
#define POTR_FLAG_ENCRYPTED
AES-256-GCM 認証タグが付与されていることを示す外側パケットフラグ。 POTR_FLAG_DATA と組み合わせる場合: [ヘッダー 32B][暗号文: packed_len B][GCM ...
#define POTR_FLAG_FIN
正常終了通知パケットであることを示すフラグ。送信者が potrCloseService 時に送出し、受信者は即座に DISCONNECTED へ遷移する。ペイロードなし。
#define POTR_FLAG_NACK
再送要求パケットであることを示すフラグ。ack_num に要求通番を格納する。
#define POTR_PEER_NA
ピア ID 未割当を示す予約値。 1:1 モードのコールバックで渡される (ピアの概念がない)。 potrSend() に N:1 モードで指定した場合はエラーを返す。
#define POTR_SUCCESS
成功の戻り値を表す定数。
#define POTR_ERROR
失敗の戻り値を表す定数。
int packet_build_ping(PotrPacket *packet, const PotrPacketSessionHdr *shdr, uint32_t seq_num, uint32_t ack_num)
PING パケットを構築します。
Definition packet.c:99
int packet_parse(PotrPacket *packet, const void *buf, size_t buf_len)
受信バイト列をパケット構造体に解析します。
Definition packet.c:299
int packet_build_nack(PotrPacket *packet, const PotrPacketSessionHdr *shdr, uint32_t nack_num)
NACK パケットを構築します。
Definition packet.c:62
int packet_build_reject(PotrPacket *packet, const PotrPacketSessionHdr *shdr, uint32_t seq_num)
再送不能通知 (REJECT) パケットを構築します。
Definition packet.c:132
int packet_unpack_next(const PotrPacket *container, size_t *offset, PotrPacket *elem_out)
データパケットから次のペイロードエレメントを取り出します。
Definition packet.c:238
size_t packet_wire_size(const PotrPacket *packet)
パケットのヘッダー + ペイロードの合計バイト数を返します。
Definition packet.c:347
パケット構築・解析モジュールの内部ヘッダー。
#define PACKET_HEADER_SIZE
パケットヘッダーの固定長 (バイト)。payload フィールドの開始オフセット。
Definition packet.h:23
通信ライブラリの定数ファイル。
@ POTR_TRACE_ERROR
エラー。操作の失敗を記録。TRACE_LV_ERROR (1) と同値。
@ POTR_TRACE_INFO
情報。TRACE_LV_INFO (3) と同値。
@ POTR_TRACE_VERBOSE
詳細情報 (デバッグ)。TRACE_LV_VERBOSE (4) と同値。
@ POTR_TRACE_WARNING
警告。回復可能な異常を記録。TRACE_LV_WARNING (2) と同値。
@ POTR_TYPE_BROADCAST
1:N 通信 (UDP ブロードキャスト)。
Definition porter_type.h:92
@ POTR_TYPE_TCP_BIDIR
TCP 双方向通信 (両端が potrSend 可)。
@ POTR_TYPE_UNICAST_BIDIR
双方向 1:1 通信 (UDP ユニキャスト)。
Definition porter_type.h:94
@ POTR_TYPE_MULTICAST
1:N 通信 (UDP マルチキャスト)。
Definition porter_type.h:91
@ POTR_ROLE_SENDER
送信者。
@ POTR_ROLE_RECEIVER
受信者。
uint32_t PotrPeerId
ピア識別子。
Definition porter_type.h:32
@ POTR_EVENT_DISCONNECTED
切断を検知 (タイムアウト / FIN 受信 / REJECT 受信)。data=NULL, len=0。
@ POTR_EVENT_CONNECTED
送信者からの疎通を初検知 or 復帰。data=NULL, len=0。
@ POTR_EVENT_DATA
データ受信。data/len に内容が格納される。
セッションコンテキスト内部定義ヘッダー。
struct PotrPeerContext_ PotrPeerContext
N:1 モードにおける個別ピアのコンテキスト。
static int potr_is_raw_type(PotrType t)
RAW 系通信種別 (POTR_TYPE_*_RAW) か判定する。
Definition potrContext.h:54
#define POTR_NACK_DEDUP_SLOTS
NACK 重複抑制リングバッファのスロット数 (POTR_MAX_PATH × 2)。
Definition potrContext.h:81
#define POTR_INVALID_SOCKET
Definition potrContext.h:36
int PotrSocket
Definition potrContext.h:32
pthread_mutex_t PotrMutexLocal
porter 内部ログマクロ定義ヘッダー。
#define POTR_LOG(level,...)
porter 内部ログ出力マクロ。
Definition potrLog.h:68
PotrPeerContext * peer_find_by_session(struct PotrContext_ *ctx, uint32_t session_id, int64_t session_tv_sec, int32_t session_tv_nsec)
session_triplet でピアを検索する。
void peer_path_clear(struct PotrContext_ *ctx, PotrPeerContext *peer, int path_idx)
ピアの特定パスをクリアしてスロットを未使用に戻す。
PotrPeerContext * peer_create(struct PotrContext_ *ctx, const struct sockaddr_in *sender_addr, int path_idx)
新規ピアを作成する。
void peer_free(struct PotrContext_ *ctx, PotrPeerContext *peer)
ピアリソースを解放してスロットをクリアする。
N:1 モード用ピアテーブル管理モジュールの内部ヘッダー。
static void get_monotonic(int64_t *tv_sec, int32_t *tv_nsec)
static void n1_send_ping_reply(struct PotrContext_ *ctx, PotrPeerContext *peer, uint32_t req_seq_num)
int comm_recv_thread_start(struct PotrContext_ *ctx)
非 TCP 受信スレッドを起動します。
static int reorder_gap_ready(struct PotrContext_ *ctx, uint32_t nack_num)
static void send_reject(struct PotrContext_ *ctx, uint32_t seq_num)
static void n1_send_nack(struct PotrContext_ *ctx, PotrPeerContext *peer, uint32_t nack_seq)
#define POTR_MUTEX_LOCK_LOCAL(m)
#define POTR_NACK_DEDUP_MS
NACK 重複抑制の時間窓 (ミリ秒)。この時間内の同一 ack_num の NACK は破棄する。
static void drain_recv_window(struct PotrContext_ *ctx)
static void * recv_thread_func(void *arg)
static uint64_t get_ms_mono(void)
static void n1_process_outer_pkt(struct PotrContext_ *ctx, PotrPeerContext *peer, const PotrPacket *pkt)
static int tcp_send_all_raw(PotrSocket fd, const uint8_t *buf, size_t n)
static int tcp_read_all(PotrSocket fd, uint8_t *buf, size_t n)
static void send_ping_reply(struct PotrContext_ *ctx, uint32_t req_seq_num)
int tcp_recv_thread_start(struct PotrContext_ *ctx, int path_idx)
TCP 受信スレッドを path ごとに起動します。
#define POTR_MUTEX_UNLOCK_LOCAL(m)
static void process_outer_pkt(struct PotrContext_ *ctx, const PotrPacket *pkt)
static void n1_drain_recv_window(struct PotrContext_ *ctx, PotrPeerContext *peer)
static void tcp_send_ping_reply(struct PotrContext_ *ctx, int path_idx, uint32_t req_seq_num)
static void recv_deliver(struct PotrContext_ *ctx, const uint8_t *payload, size_t payload_len, int compressed)
static void check_reorder_timeout(struct PotrContext_ *ctx)
static void n1_notify_health_alive(struct PotrContext_ *ctx, PotrPeerContext *peer)
int comm_recv_thread_stop(struct PotrContext_ *ctx)
非 TCP 受信スレッドを停止します。
static void update_path_recv(struct PotrContext_ *ctx, int path_idx, const struct sockaddr_in *sender)
static void n1_recv_deliver(struct PotrContext_ *ctx, PotrPeerContext *peer, const uint8_t *payload, size_t payload_len, int compressed)
static void notify_connected_tcp(struct PotrContext_ *ctx)
static void * tcp_recv_thread_func(void *arg)
static void n1_deliver_payload_elem(struct PotrContext_ *ctx, PotrPeerContext *peer, const PotrPacket *elem)
static int check_and_update_session(struct PotrContext_ *ctx, const PotrPacket *pkt)
static int n1_reorder_gap_ready(PotrPeerContext *peer, uint32_t nack_num)
static void n1_check_health_timeout(struct PotrContext_ *ctx)
int tcp_recv_thread_stop(struct PotrContext_ *ctx, int path_idx)
TCP 受信スレッドの終了を待機します。
static void check_health_timeout(struct PotrContext_ *ctx)
static int n1_check_and_update_session(struct PotrContext_ *ctx, PotrPeerContext *peer, const PotrPacket *pkt)
static void raw_session_disconnect(struct PotrContext_ *ctx)
static TcpRecvArg s_tcp_recv_args[POTR_MAX_PATH]
static void notify_health_alive(struct PotrContext_ *ctx)
static void n1_update_path_recv(PotrPeerContext *peer, const struct sockaddr_in *sender_addr, int path_idx)
static uint64_t get_ms(void)
static int check_src_addr(const struct PotrContext_ *ctx, const struct sockaddr_in *sender)
static void deliver_payload_elem(struct PotrContext_ *ctx, const PotrPacket *elem)
static void send_nack(struct PotrContext_ *ctx, uint32_t nack_seq)
static void n1_send_reject(struct PotrContext_ *ctx, PotrPeerContext *peer, uint32_t seq_num)
static int tcp_wait_readable(PotrSocket fd, uint32_t wait_ms)
受信スレッド内部ヘッダー。
int seqnum_in_window(uint32_t seq, uint32_t base, uint16_t window_size)
通番 seq がウィンドウ [base, base + window_size) に含まれるか判定します。
Definition seqnum.c:80
通番管理モジュールの内部ヘッダー。
セッションコンテキスト構造体。PotrHandle の実体。
uint32_t session_id
自セッション識別子 (乱数)。
PotrRecvCallback callback
受信コールバック。
volatile uint64_t tcp_last_ping_req_recv_ms[POTR_MAX_PATH]
TCP PING 要求最終受信時刻 (ms, CLOCK_MONOTONIC 基準)。path ごと。接続確立時に現在時刻で初期化。
PotrGlobalConfig global
グローバル設定。
volatile uint64_t tcp_last_ping_recv_ms[POTR_MAX_PATH]
TCP PING 応答最終受信時刻 (ms, CLOCK_MONOTONIC 基準)。path ごと。接続確立時に現在時刻で初期化。
struct in_addr dst_addr_resolved[POTR_MAX_PATH]
解決済み宛先 IPv4 アドレス (unicast のみ)。
size_t compress_buf_size
compress_buf のサイズ (バイト)。
uint8_t * frag_buf
フラグメント結合バッファ (動的確保。max_message_size バイト)。
PotrMutex send_window_mutex
send_window 保護用ミューテックス (送信スレッド・ヘルスチェックスレッド・受信スレッドが競合するため)。
int64_t peer_session_tv_sec
追跡中の相手セッション開始時刻 秒部。
uint32_t reorder_nack_num
待機中の欠番通番。
int32_t peer_session_tv_nsec
追跡中の相手セッション開始時刻 ナノ秒部。
struct in_addr src_addr_resolved[POTR_MAX_PATH]
解決済み送信元 IPv4 アドレス。
int32_t session_tv_nsec
自セッション開始時刻 ナノ秒部。
PotrRole role
役割 (POTR_ROLE_SENDER / POTR_ROLE_RECEIVER)。
uint8_t * crypto_buf
暗号化・復号用一時バッファ (動的確保)。
uint8_t * compress_buf
圧縮・解凍用一時バッファ (動的確保)。
uint8_t * recv_buf
受信バッファ / 再送 wire 組立バッファ (動的確保。PACKET_HEADER_SIZE + max_payload バイト)。
volatile int running[POTR_MAX_PATH]
受信スレッド実行フラグ (1: 実行中, 0: 停止)。path ごと。
volatile int health_alive
疎通状態 (1: alive, 0: dead/未接続)。UDP 用。受信者が管理。
size_t frag_buf_len
フラグメント結合バッファの現在のデータ長 (バイト)。
PotrThread recv_thread[POTR_MAX_PATH]
受信スレッドハンドル (path ごと)。
uint32_t peer_session_id
追跡中の相手セッション識別子。
size_t tcp_first_pkt_len[POTR_MAX_PATH]
先読みパケットのバイト数 (0: 先読みなし)。
PotrMutex tcp_send_mutex[POTR_MAX_PATH]
TCP send() 排他制御 (path ごと)。送信スレッド・ヘルスチェックスレッド・recv スレッド競合防止。
int32_t reorder_deadline_nsec
タイムアウト期限 ナノ秒部。
int64_t reorder_deadline_sec
タイムアウト期限 秒部 (CLOCK_MONOTONIC)。
int32_t path_last_recv_nsec[POTR_MAX_PATH]
パスごとの最終受信時刻 ナノ秒部。
int frag_compressed
フラグメント受信中の圧縮フラグ (非 0: 圧縮あり)。
PotrMutex tcp_state_mutex
tcp_state_cv 保護用ミューテックス。tcp_active_paths のカウンタ更新も保護。
int is_multi_peer
1: N:1 モード (src_addr/src_port 省略), 0: 1:1 モード。
PotrServiceDef service
サービス定義。
uint16_t peer_port[POTR_MAX_PATH]
各パスで観測した送信者ポート (NBO)。0 = 未観測。
size_t crypto_buf_size
crypto_buf のサイズ (バイト)。
PotrMutex peers_mutex
ピアテーブル保護用ミューテックス。
uint8_t * tcp_first_pkt_buf[POTR_MAX_PATH]
先読みパケットバッファ (動的確保、PACKET_HEADER_SIZE + max_payload バイト)。
PotrSocket sock[POTR_MAX_PATH]
各パスの UDP ソケット。
int peer_session_known
相手セッションが初期化済みか (0: 未初期化)。
PotrWindow send_window
送信バッファ (過去 N パケット保持。NACK 再送・REJECT 判定に使用)。
int max_peers
ピアテーブルサイズ (service.max_peers から取得)。
int64_t path_last_recv_sec[POTR_MAX_PATH]
パスごとの最終受信時刻 秒部。0 = 未受信。
uint8_t nack_dedup_next
次に書き込むスロットインデックス。
int reorder_pending
リオーダー待機中か (1: 待機中、0: 待機なし)。
PotrMutex recv_window_mutex
recv_window 保護用ミューテックス。
struct sockaddr_in dest_addr[POTR_MAX_PATH]
送信先ソケットアドレス (送信者が sendto に使用)。
PotrWindow recv_window
受信ウィンドウ (順序整列・欠番検出)。
int64_t session_tv_sec
自セッション開始時刻 秒部。
int n_path
有効パス数。
PotrPeerContext * peers
ピアテーブル (動的確保。max_peers エントリ)。
PotrSocket tcp_conn_fd[POTR_MAX_PATH]
アクティブ TCP 接続 fd (path ごと)。
int32_t last_recv_tv_nsec
最終受信時刻 ナノ秒部。
PotrNackDedupEntry nack_dedup_buf[POTR_NACK_DEDUP_SLOTS]
NACK 重複抑制エントリ配列。
int64_t last_recv_tv_sec
最終受信時刻 秒部。0 = 未受信。
uint16_t window_size
スライディングウィンドウサイズ (パケット数)。
uint32_t reorder_timeout_ms
受信ウィンドウ欠番検出後、NACK または切断を遅延する時間 (ミリ秒)。マルチパスや近距離 WAN での追い越し吸収用。0 = 即時 (デフォルト)。推奨値: LAN/マルチパス=10〜30 ms、遠...
uint16_t max_payload
最大ペイロード長 (バイト)。
uint32_t health_timeout_ms
UDP 通信種別の受信タイムアウト (ミリ秒)。RECEIVER 側で使用。0 = 無効。設定ファイルキー: udp_health_timeout_ms。
uint32_t max_message_size
1 回の potrSend で送信できる最大メッセージ長 (バイト)。デフォルト: POTR_MAX_MESSAGE_SIZE。
NACK 重複抑制バッファの 1 エントリ。
Definition potrContext.h:85
uint64_t time_ms
処理時刻 (ms、単調増加)。0 = 未使用スロット。
Definition potrContext.h:88
uint32_t ack_num
再送または REJECT した ack_num。
Definition potrContext.h:86
パケットに付与するセッション識別情報。
Definition packet.h:31
int32_t session_tv_nsec
セッション開始時刻 ナノ秒部。
Definition packet.h:35
int64_t service_id
サービス識別子。
Definition packet.h:32
uint32_t session_id
セッション識別子 (乱数)。
Definition packet.h:34
int64_t session_tv_sec
セッション開始時刻 秒部。
Definition packet.h:33
ネットワーク送受信用パケット構造体。
uint32_t ack_num
再送要求番号 / 再送不能通番 (NBO)。NACK では要求通番、REJECT では再送不能通番を格納する。
int64_t session_tv_sec
セッション開始時刻 秒部 (NBO)。struct timespec の tv_sec 相当。
uint32_t session_id
セッション識別子 (NBO)。potrOpenService 時に決定する乱数。
uint16_t flags
パケット種別フラグ (POTR_FLAG_*) (NBO)。
const uint8_t * payload
ペイロードデータへのポインタ (読み取り専用)。ウィンドウプールまたは受信バッファ内を指す。
int32_t session_tv_nsec
セッション開始時刻 ナノ秒部 (NBO)。struct timespec の tv_nsec 相当。
uint32_t seq_num
通番。送信側が付与する連番 (NBO)。
int64_t service_id
サービス識別子 (NBO)。受信時に照合する。
uint16_t payload_len
ペイロード長 (バイト) (NBO)。
int reorder_pending
リオーダー待機中 (1: 待機中, 0: 待機なし)。
PotrWindow send_window
送信ウィンドウ (NACK 再送用)。
int32_t last_recv_tv_nsec
最終受信時刻 ナノ秒部。
size_t frag_buf_len
現在のデータ長。
int frag_compressed
圧縮フラグ (非 0: 圧縮あり)。
int32_t session_tv_nsec
自セッション開始時刻 ナノ秒部。
int64_t session_tv_sec
自セッション開始時刻 秒部。
int active
1: 有効スロット, 0: 空き。
uint8_t * frag_buf
フラグメント結合バッファ (動的確保)。
int64_t last_recv_tv_sec
最終受信時刻 秒部 (CLOCK_MONOTONIC)。0 = 未受信。
PotrWindow recv_window
受信ウィンドウ (順序整列)。
int n_paths
アクティブパス数。ループ境界には使わず管理カウンタとして使用する。
int64_t peer_session_tv_sec
追跡中のピアセッション開始時刻 秒部。
volatile int health_alive
疎通状態 (1: alive, 0: dead/未接続)。
struct sockaddr_in dest_addr[POTR_MAX_PATH]
送信先ソケットアドレス (インデックス = ctx->sock[] の添字)。未使用スロットは sin_family == 0。
uint32_t peer_session_id
追跡中のピアセッション識別子。
uint32_t session_id
自セッション識別子 (乱数)。
PotrPeerId peer_id
外部公開用ピア識別子 (単調増加カウンタから付与)。
int64_t path_last_recv_sec[POTR_MAX_PATH]
パスごとの最終受信時刻 秒部。未使用スロットは 0。
int peer_session_known
ピアセッションが初期化済みか (0: 未初期化)。
PotrMutex send_window_mutex
send_window 保護 (送信・受信・ヘルスチェックスレッド競合)。
int32_t peer_session_tv_nsec
追跡中のピアセッション開始時刻 ナノ秒部。
int32_t path_last_recv_nsec[POTR_MAX_PATH]
パスごとの最終受信時刻 ナノ秒部。
uint16_t src_port
送信者の送信元 bind ポート番号。0 = OS 自動選定。(全通信種別で省略可)
int encrypt_enabled
非 0 のとき暗号化有効。設定ファイルに有効な encrypt_key が存在するときに 1 に設定される。
char src_addr[POTR_MAX_PATH][POTR_MAX_ADDR_LEN]
送信元アドレス [0]=src_addr1 〜 [3]=src_addr4。送信者は bind / 送信インターフェース、受信者は送信元フィルタ。(全通信種別で必須)
PotrType type
通信種別。
int64_t service_id
サービス ID。
char dst_addr[POTR_MAX_PATH][POTR_MAX_ADDR_LEN]
宛先アドレス [0]=dst_addr1 〜 [3]=dst_addr4。送信者は送信先、受信者は bind アドレス。(unicast のみ)
uint8_t encrypt_key[POTR_CRYPTO_KEY_SIZE]
AES-256-GCM 事前共有鍵 (32 バイト)。encrypt_enabled が 0 の場合は未使用。
uint32_t base_seq
ウィンドウ先頭の通番。
Definition window.h:34
uint32_t next_seq
送信側: 次に割り当てる通番。受信側: 次に期待する通番。
Definition window.h:35
uint8_t * valid
バッファ有効フラグ配列 (動的確保。window_size バイト)。
Definition window.h:32
uint16_t window_size
ウィンドウサイズ (パケット数)。
Definition window.h:36
struct PotrContext_ * ctx
void window_recv_skip(PotrWindow *win, uint32_t seq_num)
受信ウィンドウで指定通番をスキップして次の通番へ前進させます。
Definition window.c:313
int window_init(PotrWindow *win, uint32_t initial_seq, uint16_t window_size, uint16_t max_payload)
ウィンドウを初期化します。
Definition window.c:42
int window_recv_needs_nack(const PotrWindow *win, uint32_t *nack_num)
受信ウィンドウで欠番が発生しているか確認し、NACK 番号を返します。
Definition window.c:336
void window_recv_reset(PotrWindow *win, uint32_t new_base_seq)
受信ウィンドウを新しい基点通番でリセットします。
Definition window.c:378
int window_send_get(const PotrWindow *win, uint32_t seq_num, PotrPacket *packet_out)
送信ウィンドウから指定通番のパケットを取得します (再送用)。
Definition window.c:202
int window_recv_pop(PotrWindow *win, PotrPacket *packet)
受信ウィンドウから順序整列済みパケットを取り出します。
Definition window.c:279
int window_recv_push(PotrWindow *win, const PotrPacket *packet)
受信ウィンドウにパケットを格納します。
Definition window.c:242
スライディングウィンドウ管理モジュールの内部ヘッダー。