18 #include <sys/socket.h>
19 #include <sys/select.h>
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
47 clock_gettime(CLOCK_MONOTONIC, &ts);
48 return (uint64_t)ts.tv_sec * 1000ULL + (uint64_t)ts.tv_nsec / 1000000ULL;
50 return (uint64_t)GetTickCount64();
56 size_t payload_len,
int compressed);
63 #define POTR_MUTEX_LOCK_LOCAL(m) pthread_mutex_lock(m)
64 #define POTR_MUTEX_UNLOCK_LOCAL(m) pthread_mutex_unlock(m)
67 #define POTR_MUTEX_LOCK_LOCAL(m) EnterCriticalSection(m)
68 #define POTR_MUTEX_UNLOCK_LOCAL(m) LeaveCriticalSection(m)
101 memcpy(nonce + 4, &nack_pkt.
flags, 2);
102 memcpy(nonce + 6, &nack_pkt.
ack_num, 4);
103 memset(nonce + 10, 0, 2);
118 if (peer->
dest_addr[k].sin_family == 0)
continue;
120 sendto(ctx->
sock[k], wire_buf, wire_len, 0,
121 (
const struct sockaddr *)&peer->
dest_addr[k],
124 sendto(ctx->
sock[k], (
const char *)wire_buf, (
int)wire_len, 0,
125 (
const struct sockaddr *)&peer->
dest_addr[k],
135 if (peer->
dest_addr[k].sin_family == 0)
continue;
137 sendto(ctx->
sock[k], &nack_pkt, wire_len, 0,
138 (
const struct sockaddr *)&peer->
dest_addr[k],
141 sendto(ctx->
sock[k], (
const char *)&nack_pkt, (
int)wire_len, 0,
142 (
const struct sockaddr *)&peer->
dest_addr[k],
175 memcpy(nonce + 4, &reject_pkt.
flags, 2);
176 memcpy(nonce + 6, &reject_pkt.
ack_num, 4);
177 memset(nonce + 10, 0, 2);
192 if (peer->
dest_addr[k].sin_family == 0)
continue;
194 sendto(ctx->
sock[k], wire_buf, wire_len, 0,
195 (
const struct sockaddr *)&peer->
dest_addr[k],
198 sendto(ctx->
sock[k], (
const char *)wire_buf, (
int)wire_len, 0,
199 (
const struct sockaddr *)&peer->
dest_addr[k],
209 if (peer->
dest_addr[k].sin_family == 0)
continue;
211 sendto(ctx->
sock[k], &reject_pkt, wire_len, 0,
212 (
const struct sockaddr *)&peer->
dest_addr[k],
215 sendto(ctx->
sock[k], (
const char *)&reject_pkt, (
int)wire_len, 0,
216 (
const struct sockaddr *)&peer->
dest_addr[k],
224 uint32_t req_seq_num)
228 uint32_t my_next_seq;
266 memcpy(nonce + 4, &reply_pkt.
flags, 2);
267 memcpy(nonce + 6, &reply_pkt.
seq_num, 4);
268 memset(nonce + 10, 0, 2);
283 if (peer->
dest_addr[k].sin_family == 0)
continue;
285 sendto(ctx->
sock[k], wire_buf, wire_len, 0,
286 (
const struct sockaddr *)&peer->
dest_addr[k],
289 sendto(ctx->
sock[k], (
const char *)wire_buf, (
int)wire_len, 0,
290 (
const struct sockaddr *)&peer->
dest_addr[k],
300 if (peer->
dest_addr[k].sin_family == 0)
continue;
302 sendto(ctx->
sock[k], &reply_pkt, wire_len, 0,
303 (
const struct sockaddr *)&peer->
dest_addr[k],
306 sendto(ctx->
sock[k], (
const char *)&reply_pkt, (
int)wire_len, 0,
307 (
const struct sockaddr *)&peer->
dest_addr[k],
337 "recv[service_id=%" PRId64
"]: CONNECTED peer=%u",
349 const uint8_t *payload,
size_t payload_len,
357 payload, payload_len) == 0)
365 "recv[service_id=%" PRId64
"]: peer=%u decompress failed",
457 "recv[service_id=%" PRId64
"]: peer=%u recv_window full, dropping seq=%u",
467 "recv[service_id=%" PRId64
"]: peer=%u recv_window utilization high (%u/%u)",
477 "recv[service_id=%" PRId64
"]: peer=%u NACK seq=%u",
495 "recv[service_id=%" PRId64
"]: peer=%u NACK seq=%u (post-drain)",
560 const struct sockaddr_in *sender_addr,
567 clock_gettime(CLOCK_MONOTONIC, &ts);
568 s = (int64_t)ts.tv_sec;
569 ns = (int32_t)ts.tv_nsec;
574 ms = GetTickCount64();
575 s = (int64_t)(ms / 1000ULL);
576 ns = (int32_t)((ms % 1000ULL) * 1000000UL);
582 if (peer->
dest_addr[path_idx].sin_family == AF_INET)
585 peer->
dest_addr[path_idx].sin_port = sender_addr->sin_port;
592 peer->
dest_addr[path_idx] = *sender_addr;
597 "n1_update_path_recv: peer=%u path %d learned",
598 (
unsigned)peer->
peer_id, path_idx);
609 clock_gettime(CLOCK_MONOTONIC, &ts);
610 now_sec = (int64_t)ts.tv_sec;
611 now_nsec = (int32_t)ts.tv_nsec;
616 ms = GetTickCount64();
617 now_sec = (int64_t)(ms / 1000ULL);
618 now_nsec = (int32_t)((ms % 1000ULL) * 1000000UL);
637 int64_t path_elapsed;
663 "recv[service_id=%" PRId64
"]: peer=%u DISCONNECTED (timeout %lldms)",
665 (
long long)elapsed_ms);
690 const struct sockaddr_in *sender)
708 for (i = 0; i < ctx->
n_path; i++)
718 for (i = 0; i < ctx->
n_path; i++)
731 for (i = 0; i < ctx->
n_path; i++)
761 "recv[service_id=%" PRId64
"]: new session (first contact), new_id=%u seq=%u",
780 "recv[service_id=%" PRId64
"]: new session (tv_sec %lld > %lld)"
781 ", old_id=%u new_id=%u",
794 "recv[service_id=%" PRId64
"]: new session (tv_nsec %d > %d)"
795 ", old_id=%u new_id=%u",
808 "recv[service_id=%" PRId64
"]: new session (id tiebreak %u > %u)",
841#define POTR_NACK_DEDUP_MS 200U
848 clock_gettime(CLOCK_MONOTONIC, &ts);
849 return (uint64_t)ts.tv_sec * 1000ULL + (uint64_t)ts.tv_nsec / 1000000ULL;
851 return (uint64_t)GetTickCount64();
860 clock_gettime(CLOCK_MONOTONIC, &ts);
861 *tv_sec = (int64_t)ts.tv_sec;
862 *tv_nsec = (int32_t)ts.tv_nsec;
864 ULONGLONG ms = GetTickCount64();
865 *tv_sec = (int64_t)(ms / 1000ULL);
866 *tv_nsec = (int32_t)((ms % 1000ULL) * 1000000UL);
873 const struct sockaddr_in *sender)
878 ctx->
peer_port[path_idx] = sender->sin_port;
886 && ctx->
dest_addr[path_idx].sin_addr.s_addr == 0)
888 ctx->
dest_addr[path_idx].sin_addr = sender->sin_addr;
890 if (ctx->
dest_addr[path_idx].sin_port == 0)
892 ctx->
dest_addr[path_idx].sin_port = sender->sin_port;
905 "recv[service_id=%" PRId64
"]: CONNECTED",
927 for (i = 0; i < ctx->
n_path; i++)
969 "recv[service_id=%" PRId64
"]: DISCONNECTED (timeout %lldms >= %ums)",
971 (
long long)elapsed_ms,
1010 uint32_t effective_ms;
1022 effective_ms = ms + (uint32_t)((uint32_t)now_nsec % ms);
1073 "recv[service_id=%" PRId64
"]: NACK seq=%u (reorder timeout)",
1105 memcpy(nonce + 4, &nack_pkt.
flags, 2);
1106 memcpy(nonce + 6, &nack_pkt.
ack_num, 4);
1107 memset(nonce + 10, 0, 2);
1120 for (i = 0; i < ctx->
n_path; i++)
1125 sendto(ctx->
sock[i], wire_buf, wire_len, 0,
1126 (
const struct sockaddr *)&ctx->
dest_addr[i],
1129 sendto(ctx->
sock[i], (
const char *)wire_buf, (
int)wire_len, 0,
1130 (
const struct sockaddr *)&ctx->
dest_addr[i],
1136 struct sockaddr_in dest;
1144 if (port == 0)
continue;
1146 memset(&dest, 0,
sizeof(dest));
1147 dest.sin_family = AF_INET;
1149 dest.sin_port = port;
1151 sendto(ctx->
sock[i], wire_buf, wire_len, 0,
1152 (
const struct sockaddr *)&dest,
sizeof(dest));
1154 sendto(ctx->
sock[i], (
const char *)wire_buf, (
int)wire_len, 0,
1155 (
const struct sockaddr *)&dest,
sizeof(dest));
1164 for (i = 0; i < ctx->
n_path; i++)
1171 sendto(ctx->
sock[i], &nack_pkt, wire_len, 0,
1172 (
const struct sockaddr *)&ctx->
dest_addr[i],
1175 sendto(ctx->
sock[i], (
const char *)&nack_pkt, (
int)wire_len, 0,
1176 (
const struct sockaddr *)&ctx->
dest_addr[i],
1182 struct sockaddr_in dest;
1194 if (port == 0)
continue;
1196 memset(&dest, 0,
sizeof(dest));
1197 dest.sin_family = AF_INET;
1199 dest.sin_port = port;
1202 sendto(ctx->
sock[i], &nack_pkt, wire_len, 0,
1203 (
const struct sockaddr *)&dest,
sizeof(dest));
1205 sendto(ctx->
sock[i], (
const char *)&nack_pkt, (
int)wire_len, 0,
1206 (
const struct sockaddr *)&dest,
sizeof(dest));
1219 uint32_t my_next_seq;
1253 "recv[service_id=%" PRId64
"]: PING reply sent (req_seq=%u my_next_seq=%u)",
1255 (
unsigned)req_seq_num, (
unsigned)my_next_seq);
1268 memcpy(nonce + 4, &reply_pkt.
flags, 2);
1269 memcpy(nonce + 6, &reply_pkt.
seq_num, 4);
1270 memset(nonce + 10, 0, 2);
1283 for (i = 0; i < ctx->
n_path; i++)
1286 sendto(ctx->
sock[i], wire_buf, wire_len, 0,
1287 (
const struct sockaddr *)&ctx->
dest_addr[i],
1290 sendto(ctx->
sock[i], (
const char *)wire_buf, (
int)wire_len, 0,
1291 (
const struct sockaddr *)&ctx->
dest_addr[i],
1300 for (i = 0; i < ctx->
n_path; i++)
1303 sendto(ctx->
sock[i], &reply_pkt, wire_len, 0,
1304 (
const struct sockaddr *)&ctx->
dest_addr[i],
1307 sendto(ctx->
sock[i], (
const char *)&reply_pkt, (
int)wire_len, 0,
1308 (
const struct sockaddr *)&ctx->
dest_addr[i],
1341 memcpy(nonce + 4, &reject_pkt.
flags, 2);
1342 memcpy(nonce + 6, &reject_pkt.
ack_num, 4);
1343 memset(nonce + 10, 0, 2);
1356 for (i = 0; i < ctx->
n_path; i++)
1359 sendto(ctx->
sock[i], wire_buf, wire_len, 0,
1360 (
const struct sockaddr *)&ctx->
dest_addr[i],
1363 sendto(ctx->
sock[i], (
const char *)wire_buf, (
int)wire_len, 0,
1364 (
const struct sockaddr *)&ctx->
dest_addr[i],
1373 for (i = 0; i < ctx->
n_path; i++)
1376 sendto(ctx->
sock[i], &reject_pkt, wire_len, 0,
1377 (
const struct sockaddr *)&ctx->
dest_addr[i],
1380 sendto(ctx->
sock[i], (
const char *)&reject_pkt, (
int)wire_len, 0,
1381 (
const struct sockaddr *)&ctx->
dest_addr[i],
1390 const uint8_t *payload,
1399 payload, payload_len) == 0)
1402 "recv[service_id=%" PRId64
"]: decompress %zu -> %zu bytes",
1412 "recv[service_id=%" PRId64
"]: decompress failed (src_len=%zu)",
1419 payload, payload_len);
1504 const char *pkt_type_str;
1507 pkt_type_str =
"PING";
1511 pkt_type_str =
"DATA";
1514 "recv[service_id=%" PRId64
"]: pop seq=%u %s",
1546 "recv[service_id=%" PRId64
"]: RAW DISCONNECTED (gap detected)",
1578 "recv[service_id=%" PRId64
"]: RAW recv_window full, resetting to seq=%u",
1586 "recv[service_id=%" PRId64
"]: RAW window re-push failed seq=%u (bug)",
1596 "recv[service_id=%" PRId64
"]: recv_window full (100%%), dropping seq=%u"
1597 " (base_seq=%u window_size=%u)",
1611 "recv[service_id=%" PRId64
"]: recv_window utilization high (%u/%u >= 80%%)"
1612 " seq=%u base_seq=%u",
1634 "recv[service_id=%" PRId64
"]: RAW gap re-push failed seq=%u (bug)",
1646 "recv[service_id=%" PRId64
"]: NACK seq=%u",
1672 "recv[service_id=%" PRId64
"]: NACK seq=%u (post-drain)",
1695 struct sockaddr_in sender_addr;
1702 if (poll_ms < 100U) poll_ms = 100U;
1727 for (i = 0; i < ctx->
n_path; i++)
1731 FD_SET(ctx->
sock[i], &readfds);
1732 if (ctx->
sock[i] > maxfd) maxfd = ctx->
sock[i];
1734 FD_SET(ctx->
sock[i], &readfds);
1744 if (maxfd < 0)
break;
1746 tv.tv_sec = (long)(poll_ms / 1000U);
1747 tv.tv_usec = (long)((poll_ms % 1000U) * 1000U);
1749 ret = select(maxfd + 1, &readfds, NULL, NULL, &tv);
1775 for (i = 0; i < ctx->
n_path; i++)
1779 socklen_t sender_len;
1785 if (!FD_ISSET(ctx->
sock[i], &readfds))
continue;
1787 memset(&sender_addr, 0,
sizeof(sender_addr));
1788 sender_len =
sizeof(sender_addr);
1791 recv_len = (int)recvfrom(ctx->
sock[i], buf,
1793 0, (
struct sockaddr *)&sender_addr,
1796 recv_len = recvfrom(ctx->
sock[i], (
char *)buf,
1798 0, (
struct sockaddr *)&sender_addr, &sender_len);
1804 "recv[service_id=%" PRId64
"]: recvfrom returned %d",
1812 "recv[service_id=%" PRId64
"]: packet parse failed (len=%d)",
1819 "recv[service_id=%" PRId64
"]: ignored packet for service_id=%" PRId64
"",
1829 int is_new_peer = 0;
1871 "recv[service_id=%" PRId64
"]: CONNECTED peer=%u from %u.%u.%u.%u:%u",
1873 (
unsigned)((ntohl(sender_addr.sin_addr.s_addr) >> 24) & 0xFF),
1874 (
unsigned)((ntohl(sender_addr.sin_addr.s_addr) >> 16) & 0xFF),
1875 (
unsigned)((ntohl(sender_addr.sin_addr.s_addr) >> 8) & 0xFF),
1876 (
unsigned)( ntohl(sender_addr.sin_addr.s_addr) & 0xFF),
1877 (
unsigned)ntohs(sender_addr.sin_port));
1892 uint16_t flags_nbo = htons((uint16_t)pkt.
flags);
1893 uint32_t seq_nbo = htonl(pkt.
seq_num);
1896 memcpy(nonce, &sid_nbo, 4);
1897 memcpy(nonce + 4, &flags_nbo, 2);
1898 memcpy(nonce + 6, &seq_nbo, 4);
1899 memset(nonce + 10, 0, 2);
1926 size_t dummy_len =
sizeof(dummy);
1929 uint16_t flags_nbo = htons((uint16_t)pkt.
flags);
1935 val_nbo = htonl(val);
1937 memcpy(nonce, &sid_nbo, 4);
1938 memcpy(nonce + 4, &flags_nbo, 2);
1939 memcpy(nonce + 6, &val_nbo, 4);
1940 memset(nonce + 10, 0, 2);
1961 "recv[service_id=%" PRId64
"]: peer=%u FIN received -> DISCONNECTED",
1980 size_t wire_len = 0;
2008 "recv[service_id=%" PRId64
"]: peer=%u NACK seq=%u -> retransmit",
2013 if (peer->
dest_addr[j].sin_family == 0)
continue;
2016 (
const struct sockaddr *)&peer->
dest_addr[j],
2021 (
const struct sockaddr *)&peer->
dest_addr[j],
2029 "recv[service_id=%" PRId64
"]: peer=%u NACK seq=%u not in window -> REJECT",
2079 "recv[service_id=%" PRId64
"]: peer=%u %s seq=%u",
2123 uint16_t flags_nbo = htons((uint16_t)pkt.
flags);
2124 uint32_t seq_nbo = htonl(pkt.
seq_num);
2127 memcpy(nonce, &sid_nbo, 4);
2128 memcpy(nonce + 4, &flags_nbo, 2);
2129 memcpy(nonce + 6, &seq_nbo, 4);
2130 memset(nonce + 10, 0, 2);
2140 "recv[service_id=%" PRId64
"]: decrypt failed (auth) seq=%u",
2154 size_t dummy_len =
sizeof(dummy);
2157 uint16_t flags_nbo = htons((uint16_t)pkt.
flags);
2163 "recv[service_id=%" PRId64
"]: encrypted no-payload pkt bad len=%u",
2171 val_nbo = htonl(val);
2174 memcpy(nonce, &sid_nbo, 4);
2175 memcpy(nonce + 4, &flags_nbo, 2);
2176 memcpy(nonce + 6, &val_nbo, 4);
2177 memset(nonce + 10, 0, 2);
2187 "recv[service_id=%" PRId64
"]: tag verify failed flags=0x%04x",
2250 size_t wire_len = 0;
2283 "sender[service_id=%" PRId64
"]: NACK received seq=%u"
2287 for (j = 0; j < ctx->
n_path; j++)
2291 (
const struct sockaddr *)&ctx->
dest_addr[j],
2296 (
const struct sockaddr *)&ctx->
dest_addr[j],
2304 "sender[service_id=%" PRId64
"]: NACK seq=%u not in window"
2331 "recv[service_id=%" PRId64
"]: FIN received -> DISCONNECTED",
2373 "recv[service_id=%" PRId64
"]: REJECT received seq=%u"
2374 " (packet unrecoverable)",
2413 const char *pkt_kind_str;
2416 pkt_kind_str =
"PING";
2420 pkt_kind_str =
"DATA";
2423 "recv[service_id=%" PRId64
"]: %s seq=%u path=%d",
2474 while (scan_seq != pkt.
seq_num
2485 "recv[service_id=%" PRId64
"]: NACK seq=%u (PING gap scan)",
2541 tv.tv_sec = (time_t)(wait_ms / 1000U);
2542 tv.tv_usec = (suseconds_t)((wait_ms % 1000U) * 1000U);
2543 ret = select(fd + 1, &rfds, NULL, NULL, &tv);
2544 if (ret < 0 && errno == EINTR)
return 0;
2545 if (ret < 0)
return -1;
2551 tv.tv_sec = (long)(wait_ms / 1000U);
2552 tv.tv_usec = (long)((wait_ms % 1000U) * 1000U);
2553 ret = select(0, &rfds, NULL, NULL, &tv);
2554 if (ret == SOCKET_ERROR)
return -1;
2556 return (ret > 0) ? 1 : 0;
2563 size_t received = 0;
2564 while (received < n)
2568 r = (int)recv(fd, buf + received, n - received, 0);
2569 if (r < 0)
return -1;
2570 if (r == 0)
return 0;
2572 r = recv(fd, (
char *)(buf + received), (
int)(n - received), 0);
2573 if (r == SOCKET_ERROR)
return -1;
2574 if (r == 0)
return 0;
2576 received += (size_t)r;
2601 "tcp_recv[service_id=%" PRId64
"]: CONNECTED",
2624 r = (int)send(fd, buf + sent, n - sent, 0);
2625 if (r <= 0)
return -1;
2627 r = send(fd, (
const char *)(buf + sent), (
int)(n - sent), 0);
2628 if (r == SOCKET_ERROR)
return -1;
2638 uint32_t req_seq_num)
2642 uint32_t my_next_seq;
2679 memcpy(nonce + 4, &reply_pkt.
flags, 2);
2680 memcpy(nonce + 6, &reply_pkt.
seq_num, 4);
2681 memset(nonce + 10, 0, 2);
2726 uint32_t poll_ms = use_ping_timeout
2733 "tcp_recv[service_id=%" PRId64
" path=%d]: starting (ping_req_timeout=%s)",
2735 use_ping_timeout ?
"enabled" :
"disabled");
2737 while (ctx->
running[path_idx])
2740 uint16_t wire_payload_len;
2761 memcpy(&wpl, buf + 30,
sizeof(wpl));
2762 wire_payload_len = ntohs(wpl);
2769 if (use_ping_timeout)
2772 if (!ctx->
running[path_idx])
break;
2773 if (readable < 0)
break;
2778 uint64_t elapsed =
get_ms() - last;
2782 "tcp_recv[service_id=%" PRId64
" path=%d]: PING req timeout"
2783 " (%llu ms), disconnecting",
2785 (
unsigned long long)elapsed);
2803 memcpy(&wpl, buf + 30,
sizeof(wpl));
2804 wire_payload_len = ntohs(wpl);
2811 "tcp_recv[service_id=%" PRId64
"]: oversized payload %u > max %u,"
2814 (
unsigned)wire_payload_len,
2820 if (wire_payload_len > 0)
2835 "tcp_recv[service_id=%" PRId64
"]: packet_parse failed",
2844 "tcp_recv[service_id=%" PRId64
"]: service_id mismatch (%" PRId64
")",
2856 uint16_t flg_nbo = htons((uint16_t)pkt.
flags);
2857 uint32_t seq_nbo = htonl(pkt.
seq_num);
2859 memcpy(nonce, &sid_nbo, 4);
2860 memcpy(nonce + 4, &flg_nbo, 2);
2861 memcpy(nonce + 6, &seq_nbo, 4);
2862 memset(nonce + 10, 0, 2);
2871 "tcp_recv[service_id=%" PRId64
"]: decrypt failed seq=%u",
2887 "tcp_recv[service_id=%" PRId64
" path=%d]: PING req seq=%u -> reply",
2899 "tcp_recv[service_id=%" PRId64
" path=%d]: PING resp seq=%u ack=%u",
2915 "tcp_recv[service_id=%" PRId64
" path=%d]: DATA session mismatch, ignored",
2926 "tcp_recv[service_id=%" PRId64
" path=%d]: DATA seq=%u duplicate, skipped",
2934 "tcp_recv[service_id=%" PRId64
" path=%d]: DATA seq=%u payload=%u",
2963 "tcp_recv[service_id=%" PRId64
" path=%d]: exited",
2990 "recv_thread[service_id=%" PRId64
"]: starting",
3000 "recv_thread[service_id=%" PRId64
"]: pthread_create failed",
3011 "recv_thread[service_id=%" PRId64
"]: CreateThread failed",
3039 for (i = 0; i < ctx->
n_path; i++)
3043 shutdown(ctx->
sock[i], SHUT_RD);
3051 for (i = 0; i < ctx->
n_path; i++)
3055 closesocket(ctx->
sock[i]);
3089 "tcp_recv_thread[service_id=%" PRId64
" path=%d]: starting",
3094 int rc = pthread_create(&ctx->
recv_thread[path_idx], NULL,
3101 "tcp_recv_thread[service_id=%" PRId64
" path=%d]: pthread_create failed",
3107 ctx->
recv_thread[path_idx] = CreateThread(NULL, 0,
3114 "tcp_recv_thread[service_id=%" PRId64
" path=%d]: CreateThread failed",
3144 WaitForSingleObject(ctx->
recv_thread[path_idx], INFINITE);
int potr_decompress(uint8_t *dst, size_t *dst_len, const uint8_t *src, size_t src_len)
圧縮データを解凍します。
int potr_encrypt(uint8_t *dst, size_t *dst_len, const uint8_t *src, size_t src_len, const uint8_t *key, const uint8_t *nonce, const uint8_t *aad, size_t aad_len)
AES-256-GCM でデータを暗号化します。
int potr_decrypt(uint8_t *dst, size_t *dst_len, const uint8_t *src, size_t src_len, const uint8_t *key, const uint8_t *nonce, const uint8_t *aad, size_t aad_len)
AES-256-GCM でデータを復号し、認証タグを検証します。
#define POTR_CRYPTO_NONCE_SIZE
AES-256-GCM ノンスサイズ (バイト)。session_id (4B NBO) + flags (2B NBO) + seq_or_ack_num (4B NBO) + padding (2B...
#define POTR_CRYPTO_TAG_SIZE
AES-256-GCM 認証タグサイズ (バイト)。暗号文の直後に付加する。
#define POTR_FLAG_MORE_FRAG
後続フラグメントが存在することを示すペイロードエレメントフラグ。メッセージが複数ペイロードエレメントに分割された場合、最終フラグメント以外に設定する。
#define POTR_FLAG_COMPRESSED
ペイロードが圧縮されていることを示すペイロードエレメントフラグ。圧縮はメッセージ単位で行い、全フラグメントのペイロードエレメントに設定する。先頭 4 バイトが元サイズ (NBO)、続くデータが raw ...
#define POTR_MAX_PATH
マルチパスの最大パス数。
#define POTR_FLAG_REJECT
再送不能通知パケットであることを示すフラグ。ack_num に再送不能な通番を格納する。
#define POTR_FLAG_PING
ヘルスチェック要求パケットであることを示すフラグ。ペイロードなし。
#define POTR_FLAG_DATA
データパケット (パックコンテナ) であることを示すフラグ。常に設定される。
#define POTR_FLAG_ENCRYPTED
AES-256-GCM 認証タグが付与されていることを示す外側パケットフラグ。 POTR_FLAG_DATA と組み合わせる場合: [ヘッダー 32B][暗号文: packed_len B][GCM ...
#define POTR_FLAG_FIN
正常終了通知パケットであることを示すフラグ。送信者が potrCloseService 時に送出し、受信者は即座に DISCONNECTED へ遷移する。ペイロードなし。
#define POTR_FLAG_NACK
再送要求パケットであることを示すフラグ。ack_num に要求通番を格納する。
#define POTR_PEER_NA
ピア ID 未割当を示す予約値。 1:1 モードのコールバックで渡される (ピアの概念がない)。 potrSend() に N:1 モードで指定した場合はエラーを返す。
#define POTR_SUCCESS
成功の戻り値を表す定数。
#define POTR_ERROR
失敗の戻り値を表す定数。
int packet_build_ping(PotrPacket *packet, const PotrPacketSessionHdr *shdr, uint32_t seq_num, uint32_t ack_num)
PING パケットを構築します。
int packet_parse(PotrPacket *packet, const void *buf, size_t buf_len)
受信バイト列をパケット構造体に解析します。
int packet_build_nack(PotrPacket *packet, const PotrPacketSessionHdr *shdr, uint32_t nack_num)
NACK パケットを構築します。
int packet_build_reject(PotrPacket *packet, const PotrPacketSessionHdr *shdr, uint32_t seq_num)
再送不能通知 (REJECT) パケットを構築します。
int packet_unpack_next(const PotrPacket *container, size_t *offset, PotrPacket *elem_out)
データパケットから次のペイロードエレメントを取り出します。
size_t packet_wire_size(const PotrPacket *packet)
パケットのヘッダー + ペイロードの合計バイト数を返します。
#define PACKET_HEADER_SIZE
パケットヘッダーの固定長 (バイト)。payload フィールドの開始オフセット。
@ POTR_TRACE_ERROR
エラー。操作の失敗を記録。TRACE_LV_ERROR (1) と同値。
@ POTR_TRACE_INFO
情報。TRACE_LV_INFO (3) と同値。
@ POTR_TRACE_VERBOSE
詳細情報 (デバッグ)。TRACE_LV_VERBOSE (4) と同値。
@ POTR_TRACE_WARNING
警告。回復可能な異常を記録。TRACE_LV_WARNING (2) と同値。
@ POTR_TYPE_BROADCAST
1:N 通信 (UDP ブロードキャスト)。
@ POTR_TYPE_TCP_BIDIR
TCP 双方向通信 (両端が potrSend 可)。
@ POTR_TYPE_UNICAST_BIDIR
双方向 1:1 通信 (UDP ユニキャスト)。
@ POTR_TYPE_MULTICAST
1:N 通信 (UDP マルチキャスト)。
uint32_t PotrPeerId
ピア識別子。
@ POTR_EVENT_DISCONNECTED
切断を検知 (タイムアウト / FIN 受信 / REJECT 受信)。data=NULL, len=0。
@ POTR_EVENT_CONNECTED
送信者からの疎通を初検知 or 復帰。data=NULL, len=0。
@ POTR_EVENT_DATA
データ受信。data/len に内容が格納される。
struct PotrPeerContext_ PotrPeerContext
N:1 モードにおける個別ピアのコンテキスト。
static int potr_is_raw_type(PotrType t)
RAW 系通信種別 (POTR_TYPE_*_RAW) か判定する。
#define POTR_NACK_DEDUP_SLOTS
NACK 重複抑制リングバッファのスロット数 (POTR_MAX_PATH × 2)。
#define POTR_INVALID_SOCKET
pthread_mutex_t PotrMutexLocal
#define POTR_LOG(level,...)
porter 内部ログ出力マクロ。
PotrPeerContext * peer_find_by_session(struct PotrContext_ *ctx, uint32_t session_id, int64_t session_tv_sec, int32_t session_tv_nsec)
session_triplet でピアを検索する。
void peer_path_clear(struct PotrContext_ *ctx, PotrPeerContext *peer, int path_idx)
ピアの特定パスをクリアしてスロットを未使用に戻す。
PotrPeerContext * peer_create(struct PotrContext_ *ctx, const struct sockaddr_in *sender_addr, int path_idx)
新規ピアを作成する。
void peer_free(struct PotrContext_ *ctx, PotrPeerContext *peer)
ピアリソースを解放してスロットをクリアする。
N:1 モード用ピアテーブル管理モジュールの内部ヘッダー。
static void get_monotonic(int64_t *tv_sec, int32_t *tv_nsec)
static void n1_send_ping_reply(struct PotrContext_ *ctx, PotrPeerContext *peer, uint32_t req_seq_num)
int comm_recv_thread_start(struct PotrContext_ *ctx)
非 TCP 受信スレッドを起動します。
static int reorder_gap_ready(struct PotrContext_ *ctx, uint32_t nack_num)
static void send_reject(struct PotrContext_ *ctx, uint32_t seq_num)
static void n1_send_nack(struct PotrContext_ *ctx, PotrPeerContext *peer, uint32_t nack_seq)
#define POTR_MUTEX_LOCK_LOCAL(m)
#define POTR_NACK_DEDUP_MS
NACK 重複抑制の時間窓 (ミリ秒)。この時間内の同一 ack_num の NACK は破棄する。
static void drain_recv_window(struct PotrContext_ *ctx)
static void * recv_thread_func(void *arg)
static uint64_t get_ms_mono(void)
static void n1_process_outer_pkt(struct PotrContext_ *ctx, PotrPeerContext *peer, const PotrPacket *pkt)
static int tcp_send_all_raw(PotrSocket fd, const uint8_t *buf, size_t n)
static int tcp_read_all(PotrSocket fd, uint8_t *buf, size_t n)
static void send_ping_reply(struct PotrContext_ *ctx, uint32_t req_seq_num)
int tcp_recv_thread_start(struct PotrContext_ *ctx, int path_idx)
TCP 受信スレッドを path ごとに起動します。
#define POTR_MUTEX_UNLOCK_LOCAL(m)
static void process_outer_pkt(struct PotrContext_ *ctx, const PotrPacket *pkt)
static void n1_drain_recv_window(struct PotrContext_ *ctx, PotrPeerContext *peer)
static void tcp_send_ping_reply(struct PotrContext_ *ctx, int path_idx, uint32_t req_seq_num)
static void recv_deliver(struct PotrContext_ *ctx, const uint8_t *payload, size_t payload_len, int compressed)
static void check_reorder_timeout(struct PotrContext_ *ctx)
static void n1_notify_health_alive(struct PotrContext_ *ctx, PotrPeerContext *peer)
int comm_recv_thread_stop(struct PotrContext_ *ctx)
非 TCP 受信スレッドを停止します。
static void update_path_recv(struct PotrContext_ *ctx, int path_idx, const struct sockaddr_in *sender)
static void n1_recv_deliver(struct PotrContext_ *ctx, PotrPeerContext *peer, const uint8_t *payload, size_t payload_len, int compressed)
static void notify_connected_tcp(struct PotrContext_ *ctx)
static void * tcp_recv_thread_func(void *arg)
static void n1_deliver_payload_elem(struct PotrContext_ *ctx, PotrPeerContext *peer, const PotrPacket *elem)
static int check_and_update_session(struct PotrContext_ *ctx, const PotrPacket *pkt)
static int n1_reorder_gap_ready(PotrPeerContext *peer, uint32_t nack_num)
static void n1_check_health_timeout(struct PotrContext_ *ctx)
int tcp_recv_thread_stop(struct PotrContext_ *ctx, int path_idx)
TCP 受信スレッドの終了を待機します。
static void check_health_timeout(struct PotrContext_ *ctx)
static int n1_check_and_update_session(struct PotrContext_ *ctx, PotrPeerContext *peer, const PotrPacket *pkt)
static void raw_session_disconnect(struct PotrContext_ *ctx)
static TcpRecvArg s_tcp_recv_args[POTR_MAX_PATH]
static void notify_health_alive(struct PotrContext_ *ctx)
static void n1_update_path_recv(PotrPeerContext *peer, const struct sockaddr_in *sender_addr, int path_idx)
static uint64_t get_ms(void)
static int check_src_addr(const struct PotrContext_ *ctx, const struct sockaddr_in *sender)
static void deliver_payload_elem(struct PotrContext_ *ctx, const PotrPacket *elem)
static void send_nack(struct PotrContext_ *ctx, uint32_t nack_seq)
static void n1_send_reject(struct PotrContext_ *ctx, PotrPeerContext *peer, uint32_t seq_num)
static int tcp_wait_readable(PotrSocket fd, uint32_t wait_ms)
int seqnum_in_window(uint32_t seq, uint32_t base, uint16_t window_size)
通番 seq がウィンドウ [base, base + window_size) に含まれるか判定します。
セッションコンテキスト構造体。PotrHandle の実体。
uint32_t session_id
自セッション識別子 (乱数)。
PotrRecvCallback callback
受信コールバック。
volatile uint64_t tcp_last_ping_req_recv_ms[POTR_MAX_PATH]
TCP PING 要求最終受信時刻 (ms, CLOCK_MONOTONIC 基準)。path ごと。接続確立時に現在時刻で初期化。
PotrGlobalConfig global
グローバル設定。
volatile uint64_t tcp_last_ping_recv_ms[POTR_MAX_PATH]
TCP PING 応答最終受信時刻 (ms, CLOCK_MONOTONIC 基準)。path ごと。接続確立時に現在時刻で初期化。
struct in_addr dst_addr_resolved[POTR_MAX_PATH]
解決済み宛先 IPv4 アドレス (unicast のみ)。
size_t compress_buf_size
compress_buf のサイズ (バイト)。
uint8_t * frag_buf
フラグメント結合バッファ (動的確保。max_message_size バイト)。
PotrMutex send_window_mutex
send_window 保護用ミューテックス (送信スレッド・ヘルスチェックスレッド・受信スレッドが競合するため)。
int64_t peer_session_tv_sec
追跡中の相手セッション開始時刻 秒部。
uint32_t reorder_nack_num
待機中の欠番通番。
int32_t peer_session_tv_nsec
追跡中の相手セッション開始時刻 ナノ秒部。
struct in_addr src_addr_resolved[POTR_MAX_PATH]
解決済み送信元 IPv4 アドレス。
int32_t session_tv_nsec
自セッション開始時刻 ナノ秒部。
PotrRole role
役割 (POTR_ROLE_SENDER / POTR_ROLE_RECEIVER)。
uint8_t * crypto_buf
暗号化・復号用一時バッファ (動的確保)。
uint8_t * compress_buf
圧縮・解凍用一時バッファ (動的確保)。
uint8_t * recv_buf
受信バッファ / 再送 wire 組立バッファ (動的確保。PACKET_HEADER_SIZE + max_payload バイト)。
volatile int running[POTR_MAX_PATH]
受信スレッド実行フラグ (1: 実行中, 0: 停止)。path ごと。
volatile int health_alive
疎通状態 (1: alive, 0: dead/未接続)。UDP 用。受信者が管理。
size_t frag_buf_len
フラグメント結合バッファの現在のデータ長 (バイト)。
PotrThread recv_thread[POTR_MAX_PATH]
受信スレッドハンドル (path ごと)。
uint32_t peer_session_id
追跡中の相手セッション識別子。
size_t tcp_first_pkt_len[POTR_MAX_PATH]
先読みパケットのバイト数 (0: 先読みなし)。
PotrMutex tcp_send_mutex[POTR_MAX_PATH]
TCP send() 排他制御 (path ごと)。送信スレッド・ヘルスチェックスレッド・recv スレッド競合防止。
int32_t reorder_deadline_nsec
タイムアウト期限 ナノ秒部。
int64_t reorder_deadline_sec
タイムアウト期限 秒部 (CLOCK_MONOTONIC)。
int32_t path_last_recv_nsec[POTR_MAX_PATH]
パスごとの最終受信時刻 ナノ秒部。
int frag_compressed
フラグメント受信中の圧縮フラグ (非 0: 圧縮あり)。
PotrMutex tcp_state_mutex
tcp_state_cv 保護用ミューテックス。tcp_active_paths のカウンタ更新も保護。
int is_multi_peer
1: N:1 モード (src_addr/src_port 省略), 0: 1:1 モード。
PotrServiceDef service
サービス定義。
uint16_t peer_port[POTR_MAX_PATH]
各パスで観測した送信者ポート (NBO)。0 = 未観測。
size_t crypto_buf_size
crypto_buf のサイズ (バイト)。
PotrMutex peers_mutex
ピアテーブル保護用ミューテックス。
uint8_t * tcp_first_pkt_buf[POTR_MAX_PATH]
先読みパケットバッファ (動的確保、PACKET_HEADER_SIZE + max_payload バイト)。
PotrSocket sock[POTR_MAX_PATH]
各パスの UDP ソケット。
int peer_session_known
相手セッションが初期化済みか (0: 未初期化)。
PotrWindow send_window
送信バッファ (過去 N パケット保持。NACK 再送・REJECT 判定に使用)。
int max_peers
ピアテーブルサイズ (service.max_peers から取得)。
int64_t path_last_recv_sec[POTR_MAX_PATH]
パスごとの最終受信時刻 秒部。0 = 未受信。
uint8_t nack_dedup_next
次に書き込むスロットインデックス。
int reorder_pending
リオーダー待機中か (1: 待機中、0: 待機なし)。
PotrMutex recv_window_mutex
recv_window 保護用ミューテックス。
struct sockaddr_in dest_addr[POTR_MAX_PATH]
送信先ソケットアドレス (送信者が sendto に使用)。
PotrWindow recv_window
受信ウィンドウ (順序整列・欠番検出)。
int64_t session_tv_sec
自セッション開始時刻 秒部。
PotrPeerContext * peers
ピアテーブル (動的確保。max_peers エントリ)。
PotrSocket tcp_conn_fd[POTR_MAX_PATH]
アクティブ TCP 接続 fd (path ごと)。
int32_t last_recv_tv_nsec
最終受信時刻 ナノ秒部。
PotrNackDedupEntry nack_dedup_buf[POTR_NACK_DEDUP_SLOTS]
NACK 重複抑制エントリ配列。
int64_t last_recv_tv_sec
最終受信時刻 秒部。0 = 未受信。
uint16_t window_size
スライディングウィンドウサイズ (パケット数)。
uint32_t reorder_timeout_ms
受信ウィンドウ欠番検出後、NACK または切断を遅延する時間 (ミリ秒)。マルチパスや近距離 WAN での追い越し吸収用。0 = 即時 (デフォルト)。推奨値: LAN/マルチパス=10〜30 ms、遠...
uint16_t max_payload
最大ペイロード長 (バイト)。
uint32_t health_timeout_ms
UDP 通信種別の受信タイムアウト (ミリ秒)。RECEIVER 側で使用。0 = 無効。設定ファイルキー: udp_health_timeout_ms。
uint32_t max_message_size
1 回の potrSend で送信できる最大メッセージ長 (バイト)。デフォルト: POTR_MAX_MESSAGE_SIZE。
uint64_t time_ms
処理時刻 (ms、単調増加)。0 = 未使用スロット。
uint32_t ack_num
再送または REJECT した ack_num。
int32_t session_tv_nsec
セッション開始時刻 ナノ秒部。
int64_t service_id
サービス識別子。
uint32_t session_id
セッション識別子 (乱数)。
int64_t session_tv_sec
セッション開始時刻 秒部。
uint32_t ack_num
再送要求番号 / 再送不能通番 (NBO)。NACK では要求通番、REJECT では再送不能通番を格納する。
int64_t session_tv_sec
セッション開始時刻 秒部 (NBO)。struct timespec の tv_sec 相当。
uint32_t session_id
セッション識別子 (NBO)。potrOpenService 時に決定する乱数。
uint16_t flags
パケット種別フラグ (POTR_FLAG_*) (NBO)。
const uint8_t * payload
ペイロードデータへのポインタ (読み取り専用)。ウィンドウプールまたは受信バッファ内を指す。
int32_t session_tv_nsec
セッション開始時刻 ナノ秒部 (NBO)。struct timespec の tv_nsec 相当。
uint32_t seq_num
通番。送信側が付与する連番 (NBO)。
int64_t service_id
サービス識別子 (NBO)。受信時に照合する。
uint16_t payload_len
ペイロード長 (バイト) (NBO)。
int reorder_pending
リオーダー待機中 (1: 待機中, 0: 待機なし)。
PotrWindow send_window
送信ウィンドウ (NACK 再送用)。
int32_t last_recv_tv_nsec
最終受信時刻 ナノ秒部。
size_t frag_buf_len
現在のデータ長。
int frag_compressed
圧縮フラグ (非 0: 圧縮あり)。
int32_t session_tv_nsec
自セッション開始時刻 ナノ秒部。
int64_t session_tv_sec
自セッション開始時刻 秒部。
int active
1: 有効スロット, 0: 空き。
uint8_t * frag_buf
フラグメント結合バッファ (動的確保)。
int64_t last_recv_tv_sec
最終受信時刻 秒部 (CLOCK_MONOTONIC)。0 = 未受信。
PotrWindow recv_window
受信ウィンドウ (順序整列)。
int n_paths
アクティブパス数。ループ境界には使わず管理カウンタとして使用する。
int64_t peer_session_tv_sec
追跡中のピアセッション開始時刻 秒部。
volatile int health_alive
疎通状態 (1: alive, 0: dead/未接続)。
struct sockaddr_in dest_addr[POTR_MAX_PATH]
送信先ソケットアドレス (インデックス = ctx->sock[] の添字)。未使用スロットは sin_family == 0。
uint32_t peer_session_id
追跡中のピアセッション識別子。
uint32_t session_id
自セッション識別子 (乱数)。
PotrPeerId peer_id
外部公開用ピア識別子 (単調増加カウンタから付与)。
int64_t path_last_recv_sec[POTR_MAX_PATH]
パスごとの最終受信時刻 秒部。未使用スロットは 0。
int peer_session_known
ピアセッションが初期化済みか (0: 未初期化)。
PotrMutex send_window_mutex
send_window 保護 (送信・受信・ヘルスチェックスレッド競合)。
int32_t peer_session_tv_nsec
追跡中のピアセッション開始時刻 ナノ秒部。
int32_t path_last_recv_nsec[POTR_MAX_PATH]
パスごとの最終受信時刻 ナノ秒部。
uint16_t src_port
送信者の送信元 bind ポート番号。0 = OS 自動選定。(全通信種別で省略可)
int encrypt_enabled
非 0 のとき暗号化有効。設定ファイルに有効な encrypt_key が存在するときに 1 に設定される。
char src_addr[POTR_MAX_PATH][POTR_MAX_ADDR_LEN]
送信元アドレス [0]=src_addr1 〜 [3]=src_addr4。送信者は bind / 送信インターフェース、受信者は送信元フィルタ。(全通信種別で必須)
int64_t service_id
サービス ID。
char dst_addr[POTR_MAX_PATH][POTR_MAX_ADDR_LEN]
宛先アドレス [0]=dst_addr1 〜 [3]=dst_addr4。送信者は送信先、受信者は bind アドレス。(unicast のみ)
uint8_t encrypt_key[POTR_CRYPTO_KEY_SIZE]
AES-256-GCM 事前共有鍵 (32 バイト)。encrypt_enabled が 0 の場合は未使用。
uint32_t base_seq
ウィンドウ先頭の通番。
uint32_t next_seq
送信側: 次に割り当てる通番。受信側: 次に期待する通番。
uint8_t * valid
バッファ有効フラグ配列 (動的確保。window_size バイト)。
uint16_t window_size
ウィンドウサイズ (パケット数)。
struct PotrContext_ * ctx
void window_recv_skip(PotrWindow *win, uint32_t seq_num)
受信ウィンドウで指定通番をスキップして次の通番へ前進させます。
int window_init(PotrWindow *win, uint32_t initial_seq, uint16_t window_size, uint16_t max_payload)
ウィンドウを初期化します。
int window_recv_needs_nack(const PotrWindow *win, uint32_t *nack_num)
受信ウィンドウで欠番が発生しているか確認し、NACK 番号を返します。
void window_recv_reset(PotrWindow *win, uint32_t new_base_seq)
受信ウィンドウを新しい基点通番でリセットします。
int window_send_get(const PotrWindow *win, uint32_t seq_num, PotrPacket *packet_out)
送信ウィンドウから指定通番のパケットを取得します (再送用)。
int window_recv_pop(PotrWindow *win, PotrPacket *packet)
受信ウィンドウから順序整列済みパケットを取り出します。
int window_recv_push(PotrWindow *win, const PotrPacket *packet)
受信ウィンドウにパケットを格納します。
スライディングウィンドウ管理モジュールの内部ヘッダー。