Document of c-modernization-kit (porter) 1.0.0
Loading...
Searching...
No Matches
potrSendThread.c
Go to the documentation of this file.
1
34
35#ifndef _WIN32
36 #include <pthread.h>
37 #include <sys/socket.h>
38 #include <netinet/in.h>
39 #include <time.h>
40 #include <poll.h>
41#else /* _WIN32 */
42 #include <winsock2.h>
43 #include <windows.h>
44#endif /* _WIN32 */
45
46#include <string.h>
47#include <inttypes.h>
48
49#include <porter_const.h>
50
51#include "../potrContext.h"
52#include "../potrPeerTable.h"
54#include "potrSendThread.h"
55#include "../protocol/packet.h"
56#include "../protocol/window.h"
57#include "../infra/potrLog.h"
59
60/* 現在時刻をミリ秒単位で返す (単調増加クロック) */
61static uint64_t get_ms(void)
62{
63#ifndef _WIN32
64 struct timespec ts;
65 clock_gettime(CLOCK_MONOTONIC, &ts);
66 return (uint64_t)ts.tv_sec * 1000ULL + (uint64_t)ts.tv_nsec / 1000000ULL;
67#else /* _WIN32 */
68 return (uint64_t)GetTickCount64();
69#endif /* _WIN32 */
70}
71
72/* ペイロードエレメントを packed_buf に追記する */
73static void append_payload_elem(uint8_t *packed_buf, size_t *packed_len,
74 const PotrPayloadElem *entry)
75{
76 uint16_t flags_nbo = htons(entry->flags);
77 uint32_t plen_nbo = htonl((uint32_t)entry->payload_len);
78
79 memcpy(packed_buf + *packed_len, &flags_nbo, 2); *packed_len += 2;
80 memcpy(packed_buf + *packed_len, &plen_nbo, 4); *packed_len += 4;
81 memcpy(packed_buf + *packed_len, entry->payload, entry->payload_len);
82 *packed_len += entry->payload_len;
83}
84
85/* TCP 接続ソケットへ全バイトを確実に送信する。送信中は tcp_send_mutex を保持する。
86 戻り値: POTR_SUCCESS (全バイト送信成功) / POTR_ERROR (切断 or エラー)。 */
87static int tcp_send_all(PotrSocket fd, PotrMutex *mtx,
88 const uint8_t *buf, size_t len)
89{
90 size_t sent = 0;
91 int ret = POTR_SUCCESS;
92
93#ifndef _WIN32
94 pthread_mutex_lock(mtx);
95 while (sent < len)
96 {
97 ssize_t n = send(fd, buf + sent, len - sent, 0);
98 if (n <= 0) { ret = POTR_ERROR; break; }
99 sent += (size_t)n;
100 }
101 pthread_mutex_unlock(mtx);
102#else /* _WIN32 */
103 EnterCriticalSection(mtx);
104 while (sent < len)
105 {
106 int n = send(fd, (const char *)(buf + sent), (int)(len - sent), 0);
107 if (n <= 0) { ret = POTR_ERROR; break; }
108 sent += (size_t)n;
109 }
110 LeaveCriticalSection(mtx);
111#endif /* _WIN32 */
112
113 return ret;
114}
115
116/* send_wire_buf の [PACKET_HEADER_SIZE .. PACKET_HEADER_SIZE+packed_len-1] に
117 詰め済みのペイロードから外側コンテナを構築して送信する。
118 seq_num を付与する。UDP では再送バッファ (send_window) にも登録する。
119 send_wire_buf = [NBO ヘッダー 32B][packed_payload packed_len B] として組み立てる。 */
120static void flush_packed(struct PotrContext_ *ctx, size_t packed_len)
121{
122 PotrPacket outer_pkt;
124 uint32_t seq;
125 size_t wire_len;
126 uint8_t *packed_buf = ctx->send_wire_buf + PACKET_HEADER_SIZE;
127 int is_tcp = potr_is_tcp_type(ctx->service.type);
128
129 shdr.service_id = ctx->service.service_id;
130 shdr.session_id = ctx->session_id;
131 shdr.session_tv_sec = ctx->session_tv_sec;
133
134 /* send_window へのアクセスを排他制御する (送信スレッド・ヘルスチェックスレッド・受信スレッドが競合) */
135#ifndef _WIN32
136 pthread_mutex_lock(&ctx->send_window_mutex);
137#else /* _WIN32 */
138 EnterCriticalSection(&ctx->send_window_mutex);
139#endif /* _WIN32 */
140
141 seq = ctx->send_window.next_seq;
142
143 if (packet_build_packed(&outer_pkt, &shdr, seq, packed_buf, packed_len)
144 != POTR_SUCCESS)
145 {
146#ifndef _WIN32
147 pthread_mutex_unlock(&ctx->send_window_mutex);
148#else /* _WIN32 */
149 LeaveCriticalSection(&ctx->send_window_mutex);
150#endif /* _WIN32 */
151 return;
152 }
153
154 if (ctx->service.encrypt_enabled)
155 {
156 /* 暗号化パス:
157 * 1. ENCRYPTED フラグを OR (outer_pkt.flags は既に NBO)
158 * 2. payload_len を packed_len + TAG_SIZE に更新
159 * 3. nonce = session_id(4B NBO) + flags(2B NBO) + seq_num(4B NBO) + padding(2B)
160 * 4. AAD = outer_pkt ヘッダー 32B (NBO)
161 * 5. packed_buf → ctx->crypto_buf に暗号化
162 * 6. UDP: window_send_push に暗号化済みペイロードを登録
163 * TCP: ウィンドウ登録不要 (再送は TCP 層が担保); next_seq のみインクリメント
164 * 7. send_wire_buf に暗号化済みデータを組立て
165 */
166 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
167 size_t enc_len = ctx->crypto_buf_size;
168
169 outer_pkt.flags |= htons(POTR_FLAG_ENCRYPTED);
170 outer_pkt.payload_len = htons((uint16_t)(packed_len + POTR_CRYPTO_TAG_SIZE));
171
172 /* ノンス: session_id(4B NBO) + flags(2B NBO) + seq_num(4B NBO) + padding(2B)
173 * outer_pkt の各フィールドは既に NBO */
174 memcpy(nonce, &outer_pkt.session_id, 4);
175 memcpy(nonce + 4, &outer_pkt.flags, 2);
176 memcpy(nonce + 6, &outer_pkt.seq_num, 4);
177 memset(nonce + 10, 0, 2);
178
179 if (potr_encrypt(ctx->crypto_buf, &enc_len,
180 packed_buf, packed_len,
181 ctx->service.encrypt_key,
182 nonce,
183 (const uint8_t *)&outer_pkt, PACKET_HEADER_SIZE) != 0)
184 {
185#ifndef _WIN32
186 pthread_mutex_unlock(&ctx->send_window_mutex);
187#else /* _WIN32 */
188 LeaveCriticalSection(&ctx->send_window_mutex);
189#endif /* _WIN32 */
191 "sender[service_id=%" PRId64 "]: encrypt failed seq=%u",
192 ctx->service.service_id, (unsigned)seq);
193 return;
194 }
195
196 if (is_tcp)
197 {
198 /* TCP: ウィンドウ登録不要。next_seq をインクリメントして mutex を解放 */
199 ctx->send_window.next_seq++;
200#ifndef _WIN32
201 pthread_mutex_unlock(&ctx->send_window_mutex);
202#else /* _WIN32 */
203 LeaveCriticalSection(&ctx->send_window_mutex);
204#endif /* _WIN32 */
205 }
206 else
207 {
208 /* window には暗号化済みペイロードを格納して NACK 再送時に再暗号化不要にする */
209 outer_pkt.payload = ctx->crypto_buf;
210 window_send_push(&ctx->send_window, &outer_pkt);
211#ifndef _WIN32
212 pthread_mutex_unlock(&ctx->send_window_mutex);
213#else /* _WIN32 */
214 LeaveCriticalSection(&ctx->send_window_mutex);
215#endif /* _WIN32 */
216 }
217
218 /* wire 組立: NBO ヘッダー + 暗号文 + タグ */
219 memcpy(ctx->send_wire_buf, &outer_pkt, PACKET_HEADER_SIZE);
220 memcpy(ctx->send_wire_buf + PACKET_HEADER_SIZE, ctx->crypto_buf, enc_len);
221 wire_len = PACKET_HEADER_SIZE + enc_len;
222
224 "sender[service_id=%" PRId64 "]: DATA(enc) seq=%u packed_len=%zu enc_len=%zu",
225 ctx->service.service_id, (unsigned)seq, packed_len, enc_len);
226 }
227 else
228 {
229 if (is_tcp)
230 {
231 /* TCP: ウィンドウ登録不要。next_seq をインクリメントして mutex を解放 */
232 ctx->send_window.next_seq++;
233#ifndef _WIN32
234 pthread_mutex_unlock(&ctx->send_window_mutex);
235#else /* _WIN32 */
236 LeaveCriticalSection(&ctx->send_window_mutex);
237#endif /* _WIN32 */
238 }
239 else
240 {
241 window_send_push(&ctx->send_window, &outer_pkt);
242#ifndef _WIN32
243 pthread_mutex_unlock(&ctx->send_window_mutex);
244#else /* _WIN32 */
245 LeaveCriticalSection(&ctx->send_window_mutex);
246#endif /* _WIN32 */
247 }
248
249 /* NBO ヘッダー (32B) を send_wire_buf 先頭に書き込む (ペイロードは既に直後に配置済み) */
250 memcpy(ctx->send_wire_buf, &outer_pkt, PACKET_HEADER_SIZE);
251 wire_len = PACKET_HEADER_SIZE + packed_len;
252
254 "sender[service_id=%" PRId64 "]: DATA seq=%u packed_len=%zu",
255 ctx->service.service_id, (unsigned)seq, packed_len);
256 }
257
258 if (is_tcp)
259 {
260 /* TCP v2: アクティブな全 path にループ送信する */
261 if (ctx->tcp_active_paths > 0)
262 {
263 int i;
264 for (i = 0; i < ctx->n_path; i++)
265 {
266 int pr;
267
268 if (ctx->tcp_conn_fd[i] == POTR_INVALID_SOCKET) continue;
269
270 /* 送信バッファの空きを確認 (非ブロッキング) */
271#ifndef _WIN32
272 {
273 struct pollfd pfd;
274 pfd.fd = ctx->tcp_conn_fd[i];
275 pfd.events = POLLOUT;
276 pfd.revents = 0;
277 pr = poll(&pfd, 1, 0);
278 if (pr > 0 && (pfd.revents & POLLOUT))
279 {
280 if (ctx->buf_full_suppress_cnt[i] > 0
281 && ++ctx->buf_full_suppress_cnt[i] > 10)
282 {
283 ctx->buf_full_suppress_cnt[i] = 0;
284 }
285 tcp_send_all(ctx->tcp_conn_fd[i], &ctx->tcp_send_mutex[i],
286 ctx->send_wire_buf, wire_len);
287 }
288 else
289 {
290 if (ctx->buf_full_suppress_cnt[i] == 0)
291 {
293 "send_thread[service_id=%" PRId64 "]: path[%d]"
294 " send buffer full, packet skipped",
295 ctx->service.service_id, i);
296 ctx->buf_full_suppress_cnt[i] = 1;
297 }
298 }
299 }
300#else /* _WIN32 */
301 {
302 WSAPOLLFD pfd;
303 pfd.fd = ctx->tcp_conn_fd[i];
304 pfd.events = POLLOUT;
305 pfd.revents = 0;
306 pr = WSAPoll(&pfd, 1, 0);
307 if (pr > 0 && (pfd.revents & POLLOUT))
308 {
309 if (ctx->buf_full_suppress_cnt[i] > 0
310 && ++ctx->buf_full_suppress_cnt[i] > 10)
311 {
312 ctx->buf_full_suppress_cnt[i] = 0;
313 }
314 tcp_send_all(ctx->tcp_conn_fd[i], &ctx->tcp_send_mutex[i],
315 ctx->send_wire_buf, wire_len);
316 }
317 else
318 {
319 if (ctx->buf_full_suppress_cnt[i] == 0)
320 {
322 "send_thread[service_id=%" PRId64 "]: path[%d]"
323 " send buffer full, packet skipped",
324 ctx->service.service_id, i);
325 ctx->buf_full_suppress_cnt[i] = 1;
326 }
327 }
328 }
329#endif /* _WIN32 */
330 }
331 }
332 }
333 else
334 {
335 int i;
336 for (i = 0; i < ctx->n_path; i++)
337 {
338#ifndef _WIN32
339 sendto(ctx->sock[i], ctx->send_wire_buf, wire_len, 0,
340 (const struct sockaddr *)&ctx->dest_addr[i],
341 sizeof(ctx->dest_addr[i]));
342#else /* _WIN32 */
343 sendto(ctx->sock[i], (const char *)ctx->send_wire_buf, (int)wire_len, 0,
344 (const struct sockaddr *)&ctx->dest_addr[i],
345 sizeof(ctx->dest_addr[i]));
346#endif /* _WIN32 */
347 }
348 }
349
350 /* ヘルスチェックスレッドが参照する最終送信時刻を更新し、
351 スリープ中のヘルスチェックスレッドを起床させてタイマーをリセットする */
352 ctx->last_send_ms = get_ms();
353
354 if (ctx->health_running[0])
355 {
356#ifndef _WIN32
357 pthread_mutex_lock(&ctx->health_mutex[0]);
358 pthread_cond_signal(&ctx->health_wakeup[0]);
359 pthread_mutex_unlock(&ctx->health_mutex[0]);
360#else /* _WIN32 */
361 EnterCriticalSection(&ctx->health_mutex[0]);
362 WakeConditionVariable(&ctx->health_wakeup[0]);
363 LeaveCriticalSection(&ctx->health_mutex[0]);
364#endif /* _WIN32 */
365 }
366}
367
368/* N:1 モード専用: ピアの send_window を使ってパックコンテナを構築して sendto する */
369static void flush_packed_peer(struct PotrContext_ *ctx, PotrPeerContext *peer,
370 size_t packed_len)
371{
372 PotrPacket outer_pkt;
374 uint32_t seq;
375 size_t wire_len;
376 uint8_t *packed_buf = ctx->send_wire_buf + PACKET_HEADER_SIZE;
377
378 shdr.service_id = ctx->service.service_id;
379 shdr.session_id = peer->session_id;
380 shdr.session_tv_sec = peer->session_tv_sec;
381 shdr.session_tv_nsec = peer->session_tv_nsec;
382
383#ifndef _WIN32
384 pthread_mutex_lock(&peer->send_window_mutex);
385#else /* _WIN32 */
386 EnterCriticalSection(&peer->send_window_mutex);
387#endif /* _WIN32 */
388
389 seq = peer->send_window.next_seq;
390
391 if (packet_build_packed(&outer_pkt, &shdr, seq, packed_buf, packed_len)
392 != POTR_SUCCESS)
393 {
394#ifndef _WIN32
395 pthread_mutex_unlock(&peer->send_window_mutex);
396#else /* _WIN32 */
397 LeaveCriticalSection(&peer->send_window_mutex);
398#endif /* _WIN32 */
399 return;
400 }
401
402 if (ctx->service.encrypt_enabled)
403 {
404 uint8_t nonce[POTR_CRYPTO_NONCE_SIZE];
405 size_t enc_len = ctx->crypto_buf_size;
406
407 outer_pkt.flags |= htons(POTR_FLAG_ENCRYPTED);
408 outer_pkt.payload_len = htons((uint16_t)(packed_len + POTR_CRYPTO_TAG_SIZE));
409
410 /* ノンス: session_id(4B NBO) + flags(2B NBO) + seq_num(4B NBO) + padding(2B)
411 * outer_pkt の各フィールドは既に NBO */
412 memcpy(nonce, &outer_pkt.session_id, 4);
413 memcpy(nonce + 4, &outer_pkt.flags, 2);
414 memcpy(nonce + 6, &outer_pkt.seq_num, 4);
415 memset(nonce + 10, 0, 2);
416
417 if (potr_encrypt(ctx->crypto_buf, &enc_len,
418 packed_buf, packed_len,
419 ctx->service.encrypt_key,
420 nonce,
421 (const uint8_t *)&outer_pkt, PACKET_HEADER_SIZE) != 0)
422 {
423#ifndef _WIN32
424 pthread_mutex_unlock(&peer->send_window_mutex);
425#else /* _WIN32 */
426 LeaveCriticalSection(&peer->send_window_mutex);
427#endif /* _WIN32 */
429 "sender[service_id=%" PRId64 "]: peer=%u encrypt failed seq=%u",
430 ctx->service.service_id, (unsigned)peer->peer_id, (unsigned)seq);
431 return;
432 }
433
434 outer_pkt.payload = ctx->crypto_buf;
435 window_send_push(&peer->send_window, &outer_pkt);
436
437#ifndef _WIN32
438 pthread_mutex_unlock(&peer->send_window_mutex);
439#else /* _WIN32 */
440 LeaveCriticalSection(&peer->send_window_mutex);
441#endif /* _WIN32 */
442
443 memcpy(ctx->send_wire_buf, &outer_pkt, PACKET_HEADER_SIZE);
444 memcpy(ctx->send_wire_buf + PACKET_HEADER_SIZE, ctx->crypto_buf, enc_len);
445 wire_len = PACKET_HEADER_SIZE + enc_len;
446
448 "sender[service_id=%" PRId64 "]: peer=%u DATA(enc) seq=%u packed_len=%zu",
449 ctx->service.service_id, (unsigned)peer->peer_id,
450 (unsigned)seq, packed_len);
451 }
452 else
453 {
454 window_send_push(&peer->send_window, &outer_pkt);
455
456#ifndef _WIN32
457 pthread_mutex_unlock(&peer->send_window_mutex);
458#else /* _WIN32 */
459 LeaveCriticalSection(&peer->send_window_mutex);
460#endif /* _WIN32 */
461
462 memcpy(ctx->send_wire_buf, &outer_pkt, PACKET_HEADER_SIZE);
463 wire_len = PACKET_HEADER_SIZE + packed_len;
464
466 "sender[service_id=%" PRId64 "]: peer=%u DATA seq=%u packed_len=%zu",
467 ctx->service.service_id, (unsigned)peer->peer_id,
468 (unsigned)seq, packed_len);
469 }
470
471 /* N:1 はインデックス = ctx->sock[] の添字として全パスへ送信する */
472 {
473 int k;
474 for (k = 0; k < (int)POTR_MAX_PATH; k++)
475 {
476 if (peer->dest_addr[k].sin_family == 0) continue;
477#ifndef _WIN32
478 sendto(ctx->sock[k], ctx->send_wire_buf, wire_len, 0,
479 (const struct sockaddr *)&peer->dest_addr[k],
480 sizeof(peer->dest_addr[k]));
481#else /* _WIN32 */
482 sendto(ctx->sock[k], (const char *)ctx->send_wire_buf, (int)wire_len, 0,
483 (const struct sockaddr *)&peer->dest_addr[k],
484 sizeof(peer->dest_addr[k]));
485#endif /* _WIN32 */
486 }
487 }
488
489 ctx->last_send_ms = get_ms();
490}
491
492/* N:1 モード専用: キューからエントリを取り出してピアへパッキング送信する */
493static void send_packed_peer_mode(struct PotrContext_ *ctx, PotrPayloadElem *first)
494{
495 PotrPeerId target_peer_id = first->peer_id;
496 PotrPeerContext *peer = NULL;
497 uint8_t *packed_buf = ctx->send_wire_buf + PACKET_HEADER_SIZE;
498 size_t packed_len = 0;
499 int n_dequeued = 1;
500
501 /* ピアを検索 (peers_mutex は lookup だけ保護、送信中は解放する) */
502#ifndef _WIN32
503 pthread_mutex_lock(&ctx->peers_mutex);
504#else /* _WIN32 */
505 EnterCriticalSection(&ctx->peers_mutex);
506#endif /* _WIN32 */
507 peer = peer_find_by_id(ctx, target_peer_id);
508#ifndef _WIN32
509 pthread_mutex_unlock(&ctx->peers_mutex);
510#else /* _WIN32 */
511 LeaveCriticalSection(&ctx->peers_mutex);
512#endif /* _WIN32 */
513
514 if (peer == NULL)
515 {
516 /* 切断済みピア宛エントリ: 破棄 */
518 return;
519 }
520
521 append_payload_elem(packed_buf, &packed_len, first);
522
523 /* 同一ピア宛の追加エントリを即時パッキング (pack_wait なし) */
524 if (!(first->flags & POTR_FLAG_MORE_FRAG))
525 {
526 PotrPayloadElem next;
527
528 while (potr_send_queue_peek(&ctx->send_queue, &next) == POTR_SUCCESS)
529 {
530 size_t elem_size;
531
532 if (next.peer_id != target_peer_id) break;
533 if (next.flags & POTR_FLAG_MORE_FRAG) break;
534
535 elem_size = POTR_PAYLOAD_ELEM_HDR_SIZE + (size_t)next.payload_len;
536 if (packed_len + elem_size > (size_t)ctx->global.max_payload
538 ? POTR_CRYPTO_TAG_SIZE : 0U))
539 {
540 break;
541 }
542
544 {
545 break;
546 }
547
548 append_payload_elem(packed_buf, &packed_len, &next);
549 n_dequeued++;
550 }
551 }
552
553 flush_packed_peer(ctx, peer, packed_len);
554
555 {
556 int i;
557 for (i = 0; i < n_dequeued; i++)
558 {
560 }
561 }
562}
563
564/* 送信スレッド本体 */
565#ifndef _WIN32
566static void *send_thread_func(void *arg)
567#else /* _WIN32 */
568static DWORD WINAPI send_thread_func(LPVOID arg)
569#endif /* _WIN32 */
570{
571 struct PotrContext_ *ctx = (struct PotrContext_ *)arg;
572 PotrPayloadElem first;
573
574 for (;;)
575 {
576 /* キューからエントリを取り出す (ブロッキング) */
577 if (potr_send_queue_pop(&ctx->send_queue, &first,
579 {
580 break;
581 }
582
583 /* N:1 モード: peer_id でルーティングして送信 */
584 if (ctx->is_multi_peer)
585 {
586 send_packed_peer_mode(ctx, &first);
587 continue;
588 }
589
590 /* パッキング試行 */
591 {
592 /* packed_buf は send_wire_buf のヘッダー直後領域を直接使用 (ゼロコピー) */
593 uint8_t *packed_buf = ctx->send_wire_buf + PACKET_HEADER_SIZE;
594 size_t packed_len = 0;
595 int n_dequeued = 1;
596
597 append_payload_elem(packed_buf, &packed_len, &first);
598
599 /* MORE_FRAG エントリはパッキング不可。そのまま単体コンテナとして送信。 */
600 if (!(first.flags & POTR_FLAG_MORE_FRAG))
601 {
602 uint32_t pack_wait_ms = ctx->service.pack_wait_ms;
603
604 if (pack_wait_ms > 0)
605 {
606 /* パッキング待ちあり: タイムアウトまで追加エントリを待ち合わせる */
607 uint64_t deadline = get_ms() + pack_wait_ms;
608 PotrPayloadElem next;
609
610 for (;;)
611 {
612 uint64_t now = get_ms();
613 uint32_t remaining;
614 size_t elem_size;
615
616 if (now >= deadline)
617 {
618 break; /* タイムアウト */
619 }
620
621 remaining = (uint32_t)(deadline - now);
622
623 if (potr_send_queue_peek_timed(&ctx->send_queue, &next, remaining)
624 != POTR_SUCCESS)
625 {
626 break; /* タイムアウト (エントリなし) */
627 }
628
629 if (next.flags & POTR_FLAG_MORE_FRAG)
630 {
631 break; /* MORE_FRAG はパッキング不可 */
632 }
633
634 elem_size = POTR_PAYLOAD_ELEM_HDR_SIZE + (size_t)next.payload_len;
635
636 if (packed_len + elem_size > (size_t)ctx->global.max_payload
638 ? POTR_CRYPTO_TAG_SIZE : 0U))
639 {
640 break; /* 容量満杯: 即時送信してタイマーリセット */
641 }
642
643 if (potr_send_queue_try_pop(&ctx->send_queue, &next)
644 != POTR_SUCCESS)
645 {
646 break; /* 競合防止 (通常発生しない) */
647 }
648
649 append_payload_elem(packed_buf, &packed_len, &next);
650 n_dequeued++;
651 }
652 }
653 else
654 {
655 /* パッキング待ちなし: キューにあるエントリを即時まとめる */
656 PotrPayloadElem next;
657
658 while (potr_send_queue_peek(&ctx->send_queue, &next) == POTR_SUCCESS)
659 {
660 size_t elem_size;
661
662 if (next.flags & POTR_FLAG_MORE_FRAG)
663 {
664 break;
665 }
666
667 elem_size = POTR_PAYLOAD_ELEM_HDR_SIZE + (size_t)next.payload_len;
668
669 if (packed_len + elem_size > (size_t)ctx->global.max_payload
671 ? POTR_CRYPTO_TAG_SIZE : 0U))
672 {
673 break;
674 }
675
676 if (potr_send_queue_try_pop(&ctx->send_queue, &next)
677 != POTR_SUCCESS)
678 {
679 break;
680 }
681
682 append_payload_elem(packed_buf, &packed_len, &next);
683 n_dequeued++;
684 }
685 }
686 }
687
688 /* 外側パケットを構築して送信 */
689 flush_packed(ctx, packed_len);
690
691 /* デキューした全エントリ分の inflight を減算 */
692 {
693 int i;
694 for (i = 0; i < n_dequeued; i++)
695 {
697 }
698 }
699 }
700 }
701
702#ifndef _WIN32
703 return NULL;
704#else /* _WIN32 */
705 return 0;
706#endif /* _WIN32 */
707}
708
709/* doxygen コメントは、ヘッダに記載 */
711{
712 ctx->send_thread_running = 1;
713
714#ifndef _WIN32
715 pthread_mutex_init(&ctx->send_window_mutex, NULL);
716 if (pthread_create(&ctx->send_thread, NULL, send_thread_func, ctx) != 0)
717 {
718 ctx->send_thread_running = 0;
719 pthread_mutex_destroy(&ctx->send_window_mutex);
720 return POTR_ERROR;
721 }
722#else /* _WIN32 */
723 InitializeCriticalSection(&ctx->send_window_mutex);
724 ctx->send_thread = CreateThread(NULL, 0, send_thread_func, ctx, 0, NULL);
725 if (ctx->send_thread == NULL)
726 {
727 ctx->send_thread_running = 0;
728 DeleteCriticalSection(&ctx->send_window_mutex);
729 return POTR_ERROR;
730 }
731#endif /* _WIN32 */
732
733 return POTR_SUCCESS;
734}
735
736/* doxygen コメントは、ヘッダに記載 */
738{
739 ctx->send_thread_running = 0;
741
742#ifndef _WIN32
743 pthread_join(ctx->send_thread, NULL);
744 pthread_mutex_destroy(&ctx->send_window_mutex);
745#else /* _WIN32 */
746 WaitForSingleObject(ctx->send_thread, INFINITE);
747 CloseHandle(ctx->send_thread);
748 DeleteCriticalSection(&ctx->send_window_mutex);
749#endif /* _WIN32 */
750}
データ暗号化・復号モジュールの内部ヘッダー。
int potr_encrypt(uint8_t *dst, size_t *dst_len, const uint8_t *src, size_t src_len, const uint8_t *key, const uint8_t *nonce, const uint8_t *aad, size_t aad_len)
AES-256-GCM でデータを暗号化します。
#define POTR_CRYPTO_NONCE_SIZE
AES-256-GCM ノンスサイズ (バイト)。session_id (4B NBO) + flags (2B NBO) + seq_or_ack_num (4B NBO) + padding (2B...
#define POTR_CRYPTO_TAG_SIZE
AES-256-GCM 認証タグサイズ (バイト)。暗号文の直後に付加する。
#define POTR_FLAG_MORE_FRAG
後続フラグメントが存在することを示すペイロードエレメントフラグ。メッセージが複数ペイロードエレメントに分割された場合、最終フラグメント以外に設定する。
#define POTR_MAX_PATH
マルチパスの最大パス数。
#define POTR_PAYLOAD_ELEM_HDR_SIZE
パックコンテナ内ペイロードエレメントのヘッダーサイズ (バイト)。flags (2): POTR_FLAG_MORE_FRAG / POTR_FLAG_COMPRESSED を格納 + payload_...
#define POTR_FLAG_ENCRYPTED
AES-256-GCM 認証タグが付与されていることを示す外側パケットフラグ。 POTR_FLAG_DATA と組み合わせる場合: [ヘッダー 32B][暗号文: packed_len B][GCM ...
#define POTR_SUCCESS
成功の戻り値を表す定数。
#define POTR_ERROR
失敗の戻り値を表す定数。
int packet_build_packed(PotrPacket *out, const PotrPacketSessionHdr *shdr, uint32_t seq_num, const void *packed_payload, size_t payload_len)
データパケット (パックコンテナ) を構築します。
Definition packet.c:200
パケット構築・解析モジュールの内部ヘッダー。
#define PACKET_HEADER_SIZE
パケットヘッダーの固定長 (バイト)。payload フィールドの開始オフセット。
Definition packet.h:23
通信ライブラリの定数ファイル。
@ POTR_TRACE_ERROR
エラー。操作の失敗を記録。TRACE_LV_ERROR (1) と同値。
@ POTR_TRACE_VERBOSE
詳細情報 (デバッグ)。TRACE_LV_VERBOSE (4) と同値。
uint32_t PotrPeerId
ピア識別子。
Definition porter_type.h:32
セッションコンテキスト内部定義ヘッダー。
struct PotrPeerContext_ PotrPeerContext
N:1 モードにおける個別ピアのコンテキスト。
#define POTR_INVALID_SOCKET
Definition potrContext.h:36
int PotrSocket
Definition potrContext.h:32
static int potr_is_tcp_type(PotrType t)
TCP 通信種別 (POTR_TYPE_TCP / POTR_TYPE_TCP_BIDIR) か判定する。
Definition potrContext.h:48
porter 内部ログマクロ定義ヘッダー。
#define POTR_LOG(level,...)
porter 内部ログ出力マクロ。
Definition potrLog.h:68
PotrPeerContext * peer_find_by_id(struct PotrContext_ *ctx, PotrPeerId peer_id)
peer_id でピアを検索する。
N:1 モード用ピアテーブル管理モジュールの内部ヘッダー。
void potr_send_queue_complete(PotrSendQueue *q)
int potr_send_queue_pop(PotrSendQueue *q, PotrPayloadElem *out, volatile int *running)
int potr_send_queue_peek_timed(PotrSendQueue *q, PotrPayloadElem *out, uint32_t timeout_ms)
int potr_send_queue_peek(PotrSendQueue *q, PotrPayloadElem *out)
void potr_send_queue_shutdown(PotrSendQueue *q)
int potr_send_queue_try_pop(PotrSendQueue *q, PotrPayloadElem *out)
非同期送信キューの型定義と操作関数。
pthread_mutex_t PotrMutex
static void append_payload_elem(uint8_t *packed_buf, size_t *packed_len, const PotrPayloadElem *entry)
static void flush_packed(struct PotrContext_ *ctx, size_t packed_len)
static void send_packed_peer_mode(struct PotrContext_ *ctx, PotrPayloadElem *first)
int potr_send_thread_start(struct PotrContext_ *ctx)
static void flush_packed_peer(struct PotrContext_ *ctx, PotrPeerContext *peer, size_t packed_len)
static void * send_thread_func(void *arg)
static uint64_t get_ms(void)
void potr_send_thread_stop(struct PotrContext_ *ctx)
static int tcp_send_all(PotrSocket fd, PotrMutex *mtx, const uint8_t *buf, size_t len)
非同期送信スレッドの内部ヘッダー。
セッションコンテキスト構造体。PotrHandle の実体。
uint32_t session_id
自セッション識別子 (乱数)。
volatile int tcp_active_paths
アクティブ TCP path 数 (0 = 全切断)。
PotrGlobalConfig global
グローバル設定。
PotrThread send_thread
送信スレッドハンドル。
PotrCondVar health_wakeup[POTR_MAX_PATH]
ヘルスチェックスレッドを即時起床させる条件変数 (path ごと)。
PotrMutex send_window_mutex
send_window 保護用ミューテックス (送信スレッド・ヘルスチェックスレッド・受信スレッドが競合するため)。
int32_t session_tv_nsec
自セッション開始時刻 ナノ秒部。
uint8_t * crypto_buf
暗号化・復号用一時バッファ (動的確保)。
PotrSendQueue send_queue
非同期送信キュー。
uint8_t * send_wire_buf
送信 wire 組立バッファ (動的確保。PACKET_HEADER_SIZE + max_payload バイト)。送信スレッドのみ使用。
volatile uint64_t last_send_ms
最終パケット送信時刻 (データ or PING、ms、単調増加)。0 = 未送信。
PotrMutex tcp_send_mutex[POTR_MAX_PATH]
TCP send() 排他制御 (path ごと)。送信スレッド・ヘルスチェックスレッド・recv スレッド競合防止。
int is_multi_peer
1: N:1 モード (src_addr/src_port 省略), 0: 1:1 モード。
int buf_full_suppress_cnt[POTR_MAX_PATH]
path ごとの送信バッファ満杯ログ抑制カウンタ (0: 抑制なし、1-10: 抑制中)。
PotrServiceDef service
サービス定義。
PotrMutex health_mutex[POTR_MAX_PATH]
ヘルスチェックスレッド停止用ミューテックス (path ごと)。
size_t crypto_buf_size
crypto_buf のサイズ (バイト)。
PotrMutex peers_mutex
ピアテーブル保護用ミューテックス。
PotrSocket sock[POTR_MAX_PATH]
各パスの UDP ソケット。
volatile int health_running[POTR_MAX_PATH]
ヘルスチェックスレッド実行フラグ (1: 実行中, 0: 停止)。path ごと。
PotrWindow send_window
送信バッファ (過去 N パケット保持。NACK 再送・REJECT 判定に使用)。
volatile int send_thread_running
送信スレッド実行フラグ (1: 実行中, 0: 停止)。
struct sockaddr_in dest_addr[POTR_MAX_PATH]
送信先ソケットアドレス (送信者が sendto に使用)。
int64_t session_tv_sec
自セッション開始時刻 秒部。
int n_path
有効パス数。
PotrSocket tcp_conn_fd[POTR_MAX_PATH]
アクティブ TCP 接続 fd (path ごと)。
uint16_t max_payload
最大ペイロード長 (バイト)。
パケットに付与するセッション識別情報。
Definition packet.h:31
int32_t session_tv_nsec
セッション開始時刻 ナノ秒部。
Definition packet.h:35
int64_t service_id
サービス識別子。
Definition packet.h:32
uint32_t session_id
セッション識別子 (乱数)。
Definition packet.h:34
int64_t session_tv_sec
セッション開始時刻 秒部。
Definition packet.h:33
ネットワーク送受信用パケット構造体。
uint32_t session_id
セッション識別子 (NBO)。potrOpenService 時に決定する乱数。
uint16_t flags
パケット種別フラグ (POTR_FLAG_*) (NBO)。
uint32_t seq_num
通番。送信側が付与する連番 (NBO)。
uint16_t payload_len
ペイロード長 (バイト) (NBO)。
送信キューの 1 エントリ。ペイロードエレメント 1 個分のデータを保持する。
uint16_t payload_len
ペイロード長 (バイト)。
uint16_t flags
ペイロードエレメントフラグ (MORE_FRAG, COMPRESSED など)。
uint8_t * payload
ペイロードデータへのポインタ (プールスロット内を指す)。
PotrPeerId peer_id
送信先ピア識別子 (N:1 モード用。1:1 モードでは 0)。
PotrWindow send_window
送信ウィンドウ (NACK 再送用)。
int32_t session_tv_nsec
自セッション開始時刻 ナノ秒部。
int64_t session_tv_sec
自セッション開始時刻 秒部。
struct sockaddr_in dest_addr[POTR_MAX_PATH]
送信先ソケットアドレス (インデックス = ctx->sock[] の添字)。未使用スロットは sin_family == 0。
uint32_t session_id
自セッション識別子 (乱数)。
PotrPeerId peer_id
外部公開用ピア識別子 (単調増加カウンタから付与)。
PotrMutex send_window_mutex
send_window 保護 (送信・受信・ヘルスチェックスレッド競合)。
int encrypt_enabled
非 0 のとき暗号化有効。設定ファイルに有効な encrypt_key が存在するときに 1 に設定される。
PotrType type
通信種別。
int64_t service_id
サービス ID。
uint32_t pack_wait_ms
パッキング待ち時間 (ミリ秒)。最初の送信要求からこの時間だけ待ち合わせ、複数メッセージを 1 パケットにまとめる。0 = 即時送信 (パッキング待ちなし)。パケット容量が満杯になった場合はタイマーを無...
uint8_t encrypt_key[POTR_CRYPTO_KEY_SIZE]
AES-256-GCM 事前共有鍵 (32 バイト)。encrypt_enabled が 0 の場合は未使用。
uint32_t next_seq
送信側: 次に割り当てる通番。受信側: 次に期待する通番。
Definition window.h:35
int window_send_push(PotrWindow *win, const PotrPacket *packet)
送信ウィンドウにパケットを積みます。
Definition window.c:141
スライディングウィンドウ管理モジュールの内部ヘッダー。