37 #include <sys/socket.h>
38 #include <netinet/in.h>
65 clock_gettime(CLOCK_MONOTONIC, &ts);
66 return (uint64_t)ts.tv_sec * 1000ULL + (uint64_t)ts.tv_nsec / 1000000ULL;
68 return (uint64_t)GetTickCount64();
76 uint16_t flags_nbo = htons(entry->
flags);
77 uint32_t plen_nbo = htonl((uint32_t)entry->
payload_len);
79 memcpy(packed_buf + *packed_len, &flags_nbo, 2); *packed_len += 2;
80 memcpy(packed_buf + *packed_len, &plen_nbo, 4); *packed_len += 4;
88 const uint8_t *buf,
size_t len)
94 pthread_mutex_lock(mtx);
97 ssize_t n = send(fd, buf + sent, len - sent, 0);
101 pthread_mutex_unlock(mtx);
103 EnterCriticalSection(mtx);
106 int n = send(fd, (
const char *)(buf + sent), (
int)(len - sent), 0);
110 LeaveCriticalSection(mtx);
175 memcpy(nonce + 4, &outer_pkt.
flags, 2);
176 memcpy(nonce + 6, &outer_pkt.
seq_num, 4);
177 memset(nonce + 10, 0, 2);
180 packed_buf, packed_len,
191 "sender[service_id=%" PRId64
"]: encrypt failed seq=%u",
224 "sender[service_id=%" PRId64
"]: DATA(enc) seq=%u packed_len=%zu enc_len=%zu",
254 "sender[service_id=%" PRId64
"]: DATA seq=%u packed_len=%zu",
264 for (i = 0; i < ctx->
n_path; i++)
275 pfd.events = POLLOUT;
277 pr = poll(&pfd, 1, 0);
278 if (pr > 0 && (pfd.revents & POLLOUT))
293 "send_thread[service_id=%" PRId64
"]: path[%d]"
294 " send buffer full, packet skipped",
304 pfd.events = POLLOUT;
306 pr = WSAPoll(&pfd, 1, 0);
307 if (pr > 0 && (pfd.revents & POLLOUT))
322 "send_thread[service_id=%" PRId64
"]: path[%d]"
323 " send buffer full, packet skipped",
336 for (i = 0; i < ctx->
n_path; i++)
340 (
const struct sockaddr *)&ctx->
dest_addr[i],
344 (
const struct sockaddr *)&ctx->
dest_addr[i],
413 memcpy(nonce + 4, &outer_pkt.
flags, 2);
414 memcpy(nonce + 6, &outer_pkt.
seq_num, 4);
415 memset(nonce + 10, 0, 2);
418 packed_buf, packed_len,
429 "sender[service_id=%" PRId64
"]: peer=%u encrypt failed seq=%u",
448 "sender[service_id=%" PRId64
"]: peer=%u DATA(enc) seq=%u packed_len=%zu",
450 (
unsigned)seq, packed_len);
466 "sender[service_id=%" PRId64
"]: peer=%u DATA seq=%u packed_len=%zu",
468 (
unsigned)seq, packed_len);
476 if (peer->
dest_addr[k].sin_family == 0)
continue;
479 (
const struct sockaddr *)&peer->
dest_addr[k],
483 (
const struct sockaddr *)&peer->
dest_addr[k],
498 size_t packed_len = 0;
532 if (next.
peer_id != target_peer_id)
break;
557 for (i = 0; i < n_dequeued; i++)
594 size_t packed_len = 0;
604 if (pack_wait_ms > 0)
607 uint64_t deadline =
get_ms() + pack_wait_ms;
621 remaining = (uint32_t)(deadline - now);
694 for (i = 0; i < n_dequeued; i++)
int potr_encrypt(uint8_t *dst, size_t *dst_len, const uint8_t *src, size_t src_len, const uint8_t *key, const uint8_t *nonce, const uint8_t *aad, size_t aad_len)
AES-256-GCM でデータを暗号化します。
#define POTR_CRYPTO_NONCE_SIZE
AES-256-GCM ノンスサイズ (バイト)。session_id (4B NBO) + flags (2B NBO) + seq_or_ack_num (4B NBO) + padding (2B...
#define POTR_CRYPTO_TAG_SIZE
AES-256-GCM 認証タグサイズ (バイト)。暗号文の直後に付加する。
#define POTR_FLAG_MORE_FRAG
後続フラグメントが存在することを示すペイロードエレメントフラグ。メッセージが複数ペイロードエレメントに分割された場合、最終フラグメント以外に設定する。
#define POTR_MAX_PATH
マルチパスの最大パス数。
#define POTR_PAYLOAD_ELEM_HDR_SIZE
パックコンテナ内ペイロードエレメントのヘッダーサイズ (バイト)。flags (2): POTR_FLAG_MORE_FRAG / POTR_FLAG_COMPRESSED を格納 + payload_...
#define POTR_FLAG_ENCRYPTED
AES-256-GCM 認証タグが付与されていることを示す外側パケットフラグ。 POTR_FLAG_DATA と組み合わせる場合: [ヘッダー 32B][暗号文: packed_len B][GCM ...
#define POTR_SUCCESS
成功の戻り値を表す定数。
#define POTR_ERROR
失敗の戻り値を表す定数。
int packet_build_packed(PotrPacket *out, const PotrPacketSessionHdr *shdr, uint32_t seq_num, const void *packed_payload, size_t payload_len)
データパケット (パックコンテナ) を構築します。
#define PACKET_HEADER_SIZE
パケットヘッダーの固定長 (バイト)。payload フィールドの開始オフセット。
@ POTR_TRACE_ERROR
エラー。操作の失敗を記録。TRACE_LV_ERROR (1) と同値。
@ POTR_TRACE_VERBOSE
詳細情報 (デバッグ)。TRACE_LV_VERBOSE (4) と同値。
uint32_t PotrPeerId
ピア識別子。
struct PotrPeerContext_ PotrPeerContext
N:1 モードにおける個別ピアのコンテキスト。
#define POTR_INVALID_SOCKET
static int potr_is_tcp_type(PotrType t)
TCP 通信種別 (POTR_TYPE_TCP / POTR_TYPE_TCP_BIDIR) か判定する。
#define POTR_LOG(level,...)
porter 内部ログ出力マクロ。
PotrPeerContext * peer_find_by_id(struct PotrContext_ *ctx, PotrPeerId peer_id)
peer_id でピアを検索する。
N:1 モード用ピアテーブル管理モジュールの内部ヘッダー。
void potr_send_queue_complete(PotrSendQueue *q)
int potr_send_queue_pop(PotrSendQueue *q, PotrPayloadElem *out, volatile int *running)
int potr_send_queue_peek_timed(PotrSendQueue *q, PotrPayloadElem *out, uint32_t timeout_ms)
int potr_send_queue_peek(PotrSendQueue *q, PotrPayloadElem *out)
void potr_send_queue_shutdown(PotrSendQueue *q)
int potr_send_queue_try_pop(PotrSendQueue *q, PotrPayloadElem *out)
pthread_mutex_t PotrMutex
static void append_payload_elem(uint8_t *packed_buf, size_t *packed_len, const PotrPayloadElem *entry)
static void flush_packed(struct PotrContext_ *ctx, size_t packed_len)
static void send_packed_peer_mode(struct PotrContext_ *ctx, PotrPayloadElem *first)
int potr_send_thread_start(struct PotrContext_ *ctx)
static void flush_packed_peer(struct PotrContext_ *ctx, PotrPeerContext *peer, size_t packed_len)
static void * send_thread_func(void *arg)
static uint64_t get_ms(void)
void potr_send_thread_stop(struct PotrContext_ *ctx)
static int tcp_send_all(PotrSocket fd, PotrMutex *mtx, const uint8_t *buf, size_t len)
セッションコンテキスト構造体。PotrHandle の実体。
uint32_t session_id
自セッション識別子 (乱数)。
volatile int tcp_active_paths
アクティブ TCP path 数 (0 = 全切断)。
PotrGlobalConfig global
グローバル設定。
PotrThread send_thread
送信スレッドハンドル。
PotrCondVar health_wakeup[POTR_MAX_PATH]
ヘルスチェックスレッドを即時起床させる条件変数 (path ごと)。
PotrMutex send_window_mutex
send_window 保護用ミューテックス (送信スレッド・ヘルスチェックスレッド・受信スレッドが競合するため)。
int32_t session_tv_nsec
自セッション開始時刻 ナノ秒部。
uint8_t * crypto_buf
暗号化・復号用一時バッファ (動的確保)。
PotrSendQueue send_queue
非同期送信キュー。
uint8_t * send_wire_buf
送信 wire 組立バッファ (動的確保。PACKET_HEADER_SIZE + max_payload バイト)。送信スレッドのみ使用。
volatile uint64_t last_send_ms
最終パケット送信時刻 (データ or PING、ms、単調増加)。0 = 未送信。
PotrMutex tcp_send_mutex[POTR_MAX_PATH]
TCP send() 排他制御 (path ごと)。送信スレッド・ヘルスチェックスレッド・recv スレッド競合防止。
int is_multi_peer
1: N:1 モード (src_addr/src_port 省略), 0: 1:1 モード。
int buf_full_suppress_cnt[POTR_MAX_PATH]
path ごとの送信バッファ満杯ログ抑制カウンタ (0: 抑制なし、1-10: 抑制中)。
PotrServiceDef service
サービス定義。
PotrMutex health_mutex[POTR_MAX_PATH]
ヘルスチェックスレッド停止用ミューテックス (path ごと)。
size_t crypto_buf_size
crypto_buf のサイズ (バイト)。
PotrMutex peers_mutex
ピアテーブル保護用ミューテックス。
PotrSocket sock[POTR_MAX_PATH]
各パスの UDP ソケット。
volatile int health_running[POTR_MAX_PATH]
ヘルスチェックスレッド実行フラグ (1: 実行中, 0: 停止)。path ごと。
PotrWindow send_window
送信バッファ (過去 N パケット保持。NACK 再送・REJECT 判定に使用)。
volatile int send_thread_running
送信スレッド実行フラグ (1: 実行中, 0: 停止)。
struct sockaddr_in dest_addr[POTR_MAX_PATH]
送信先ソケットアドレス (送信者が sendto に使用)。
int64_t session_tv_sec
自セッション開始時刻 秒部。
PotrSocket tcp_conn_fd[POTR_MAX_PATH]
アクティブ TCP 接続 fd (path ごと)。
uint16_t max_payload
最大ペイロード長 (バイト)。
int32_t session_tv_nsec
セッション開始時刻 ナノ秒部。
int64_t service_id
サービス識別子。
uint32_t session_id
セッション識別子 (乱数)。
int64_t session_tv_sec
セッション開始時刻 秒部。
uint32_t session_id
セッション識別子 (NBO)。potrOpenService 時に決定する乱数。
uint16_t flags
パケット種別フラグ (POTR_FLAG_*) (NBO)。
uint32_t seq_num
通番。送信側が付与する連番 (NBO)。
uint16_t payload_len
ペイロード長 (バイト) (NBO)。
送信キューの 1 エントリ。ペイロードエレメント 1 個分のデータを保持する。
uint16_t payload_len
ペイロード長 (バイト)。
uint16_t flags
ペイロードエレメントフラグ (MORE_FRAG, COMPRESSED など)。
uint8_t * payload
ペイロードデータへのポインタ (プールスロット内を指す)。
PotrPeerId peer_id
送信先ピア識別子 (N:1 モード用。1:1 モードでは 0)。
PotrWindow send_window
送信ウィンドウ (NACK 再送用)。
int32_t session_tv_nsec
自セッション開始時刻 ナノ秒部。
int64_t session_tv_sec
自セッション開始時刻 秒部。
struct sockaddr_in dest_addr[POTR_MAX_PATH]
送信先ソケットアドレス (インデックス = ctx->sock[] の添字)。未使用スロットは sin_family == 0。
uint32_t session_id
自セッション識別子 (乱数)。
PotrPeerId peer_id
外部公開用ピア識別子 (単調増加カウンタから付与)。
PotrMutex send_window_mutex
send_window 保護 (送信・受信・ヘルスチェックスレッド競合)。
int encrypt_enabled
非 0 のとき暗号化有効。設定ファイルに有効な encrypt_key が存在するときに 1 に設定される。
int64_t service_id
サービス ID。
uint32_t pack_wait_ms
パッキング待ち時間 (ミリ秒)。最初の送信要求からこの時間だけ待ち合わせ、複数メッセージを 1 パケットにまとめる。0 = 即時送信 (パッキング待ちなし)。パケット容量が満杯になった場合はタイマーを無...
uint8_t encrypt_key[POTR_CRYPTO_KEY_SIZE]
AES-256-GCM 事前共有鍵 (32 バイト)。encrypt_enabled が 0 の場合は未使用。
uint32_t next_seq
送信側: 次に割り当てる通番。受信側: 次に期待する通番。
int window_send_push(PotrWindow *win, const PotrPacket *packet)
送信ウィンドウにパケットを積みます。
スライディングウィンドウ管理モジュールの内部ヘッダー。