Document of c-modernization-kit (porter) 1.0.0
Loading...
Searching...
No Matches
potrSend.c
Go to the documentation of this file.
1
13
14#include <stdlib.h>
15#include <inttypes.h>
16
17#include <porter_const.h>
18#include <porter.h>
19
20#include "../potrContext.h"
21#include "../potrPeerTable.h"
24#include "../infra/potrLog.h"
25
26#ifndef _WIN32
27 #include <pthread.h>
28 typedef pthread_mutex_t PotrMutexLocal;
29 #define POTR_MUTEX_LOCK_LOCAL(m) pthread_mutex_lock(m)
30 #define POTR_MUTEX_UNLOCK_LOCAL(m) pthread_mutex_unlock(m)
31#else /* _WIN32 */
32 #include <winsock2.h>
33 typedef CRITICAL_SECTION PotrMutexLocal;
34 #define POTR_MUTEX_LOCK_LOCAL(m) EnterCriticalSection(m)
35 #define POTR_MUTEX_UNLOCK_LOCAL(m) LeaveCriticalSection(m)
36#endif /* _WIN32 */
37
38/* N:1 モードで 1 ピアへ send を行う内部実装 (peers_mutex 取得不要・呼び出し元で検索済み) */
39static int send_to_peer(struct PotrContext_ *ctx, PotrPeerId peer_id,
40 const uint8_t *ptr, size_t len, int flags,
41 uint16_t base_flags)
42{
43 size_t remaining = len;
44 size_t max_payload;
45
47 if (ctx->service.encrypt_enabled)
48 {
49 max_payload -= POTR_CRYPTO_TAG_SIZE;
50 }
51
52 if ((flags & POTR_SEND_BLOCKING) != 0)
53 {
55 }
56
57 while (remaining > 0)
58 {
59 size_t chunk = (remaining > max_payload) ? max_payload : remaining;
60 int more_frag = (remaining > chunk);
61 uint16_t elem_flags = base_flags;
62
63 if (more_frag)
64 {
65 elem_flags |= POTR_FLAG_MORE_FRAG;
66 }
67
68 if (potr_send_queue_push_wait(&ctx->send_queue, peer_id,
69 elem_flags, ptr, (uint16_t)chunk,
71 {
72 return POTR_ERROR;
73 }
74
75 ptr += chunk;
76 remaining -= chunk;
77 }
78
79 if ((flags & POTR_SEND_BLOCKING) != 0)
80 {
82 }
83
84 return POTR_SUCCESS;
85}
86
87/* doxygen コメントは、ヘッダに記載 */
89 const void *data, size_t len, int flags)
90{
91 struct PotrContext_ *ctx = (struct PotrContext_ *)handle;
92 const uint8_t *ptr = (const uint8_t *)data;
93 uint16_t base_flags = 0;
94
95 if (ctx == NULL || data == NULL || len == 0
96 || len > (size_t)ctx->global.max_message_size)
97 {
99 "potrSend: invalid argument (handle=%p data=%p len=%zu max=%u)",
100 (const void *)handle, data, len,
101 (ctx != NULL) ? (unsigned)ctx->global.max_message_size : 0U);
102 return POTR_ERROR;
103 }
104
106 "potrSend: service_id=%" PRId64 " peer_id=%u len=%zu flags=0x%x",
107 ctx->service.service_id, (unsigned)peer_id, len, (unsigned)flags);
108
109 /* TCP: 全 path 切断中の場合は POTR_ERROR_DISCONNECTED を返す */
110 if (potr_is_tcp_type(ctx->service.type) && ctx->tcp_active_paths == 0)
111 {
113 "potrSend: service_id=%" PRId64 " TCP not connected",
114 ctx->service.service_id);
116 }
117
118 /* RAW モードは常にブロッキング送信 */
119 if (potr_is_raw_type(ctx->service.type))
120 {
121 flags |= POTR_SEND_BLOCKING;
122 }
123
124 /* 圧縮が要求された場合はペイロード全体を圧縮する */
125 if ((flags & POTR_SEND_COMPRESS) != 0)
126 {
127 size_t cmp_len = ctx->compress_buf_size;
128
129 if (potr_compress(ctx->compress_buf, &cmp_len,
130 (const uint8_t *)data, len) != 0)
131 {
133 "potrSend: service_id=%" PRId64 " compression failed (len=%zu)",
134 ctx->service.service_id, len);
135 return POTR_ERROR;
136 }
137
138 if (cmp_len < len)
139 {
141 "potrSend: service_id=%" PRId64 " compress %zu -> %zu bytes",
142 ctx->service.service_id, len, cmp_len);
143 ptr = ctx->compress_buf;
144 len = cmp_len;
145 base_flags = POTR_FLAG_COMPRESSED;
146 }
147 else
148 {
150 "potrSend: service_id=%" PRId64 " compression skipped"
151 " (compressed %zu >= original %zu bytes), sending uncompressed",
152 ctx->service.service_id, cmp_len, len);
153 /* 圧縮効果なし: 非圧縮のまま送信 (ptr, len, base_flags は初期値を維持) */
154 }
155 }
156
157 /* N:1 モード: peer_id に基づくルーティング */
158 if (ctx->is_multi_peer)
159 {
160 if (peer_id == POTR_PEER_NA)
161 {
163 "potrSend: service_id=%" PRId64 " N:1 mode requires valid peer_id (got POTR_PEER_NA)",
164 ctx->service.service_id);
165 return POTR_ERROR;
166 }
167
168 if (peer_id == POTR_PEER_ALL)
169 {
170 /* 全アクティブピアへ送信: peers_mutex を保持しない状態で送信するため
171 * まず peer_id リストを収集してからキューに積む */
172 PotrPeerId *ids;
173 int n_ids = 0;
174 int i;
175 int result = POTR_SUCCESS;
176
177 ids = (PotrPeerId *)malloc((size_t)ctx->max_peers * sizeof(PotrPeerId));
178 if (ids == NULL)
179 {
181 "potrSend: service_id=%" PRId64 " PEER_ALL malloc failed",
182 ctx->service.service_id);
183 return POTR_ERROR;
184 }
185
187 for (i = 0; i < ctx->max_peers; i++)
188 {
189 if (ctx->peers[i].active)
190 {
191 ids[n_ids++] = ctx->peers[i].peer_id;
192 }
193 }
195
196 for (i = 0; i < n_ids; i++)
197 {
198 if (send_to_peer(ctx, ids[i], ptr, len, flags, base_flags) != POTR_SUCCESS)
199 {
200 result = POTR_ERROR;
201 }
202 }
203 free(ids);
204 return result;
205 }
206 else
207 {
208 /* 指定ピアへ送信: 存在確認だけ mutex で保護し、送信は mutex 外で行う */
210 if (peer_find_by_id(ctx, peer_id) == NULL)
211 {
214 "potrSend: service_id=%" PRId64 " peer_id=%u not found",
215 ctx->service.service_id, (unsigned)peer_id);
216 return POTR_ERROR;
217 }
219
220 return send_to_peer(ctx, peer_id, ptr, len, flags, base_flags);
221 }
222 }
223
224 /* 1:1 モード: peer_id は無視 */
225 return send_to_peer(ctx, POTR_PEER_NA, ptr, len, flags, base_flags);
226}
データ圧縮・解凍モジュールの内部ヘッダー。
int potr_compress(uint8_t *dst, size_t *dst_len, const uint8_t *src, size_t src_len)
データを圧縮します。
#define POTR_CRYPTO_TAG_SIZE
AES-256-GCM 認証タグサイズ (バイト)。暗号文の直後に付加する。
#define POTR_FLAG_MORE_FRAG
後続フラグメントが存在することを示すペイロードエレメントフラグ。メッセージが複数ペイロードエレメントに分割された場合、最終フラグメント以外に設定する。
#define POTR_FLAG_COMPRESSED
ペイロードが圧縮されていることを示すペイロードエレメントフラグ。圧縮はメッセージ単位で行い、全フラグメントのペイロードエレメントに設定する。先頭 4 バイトが元サイズ (NBO)、続くデータが raw ...
#define POTR_PAYLOAD_ELEM_HDR_SIZE
パックコンテナ内ペイロードエレメントのヘッダーサイズ (バイト)。flags (2): POTR_FLAG_MORE_FRAG / POTR_FLAG_COMPRESSED を格納 + payload_...
#define POTR_PEER_NA
ピア ID 未割当を示す予約値。 1:1 モードのコールバックで渡される (ピアの概念がない)。 potrSend() に N:1 モードで指定した場合はエラーを返す。
#define POTR_PEER_ALL
全接続ピアへの一斉送信を指示する予約ピア ID。 N:1 モードでは全アクティブピアへユニキャスト送信する。 1:1 モードでは唯一のピアへの送信として動作する。
#define POTR_SUCCESS
成功の戻り値を表す定数。
#define POTR_ERROR
失敗の戻り値を表す定数。
#define POTR_ERROR_DISCONNECTED
TCP 全 path 切断中に potrSend() を呼んだ場合の戻り値。
#define POTR_SEND_COMPRESS
メッセージを圧縮して送信します。圧縮後のサイズが元のサイズ以上の場合は自動的に非圧縮で送信します。
#define POTR_SEND_BLOCKING
ブロッキング送信を行います。0 を指定するとノンブロッキング送信を行います。
通信ライブラリ (動的リンク用) のヘッダーファイル。
#define POTR_API
呼び出し規約マクロ。
Definition porter.h:46
#define POTR_EXPORT
DLL エクスポート/インポート制御マクロ。
Definition porter.h:37
通信ライブラリの定数ファイル。
@ POTR_TRACE_ERROR
エラー。操作の失敗を記録。TRACE_LV_ERROR (1) と同値。
@ POTR_TRACE_VERBOSE
詳細情報 (デバッグ)。TRACE_LV_VERBOSE (4) と同値。
struct PotrContext_ * PotrHandle
セッションハンドル。
uint32_t PotrPeerId
ピア識別子。
Definition porter_type.h:32
セッションコンテキスト内部定義ヘッダー。
static int potr_is_raw_type(PotrType t)
RAW 系通信種別 (POTR_TYPE_*_RAW) か判定する。
Definition potrContext.h:54
static int potr_is_tcp_type(PotrType t)
TCP 通信種別 (POTR_TYPE_TCP / POTR_TYPE_TCP_BIDIR) か判定する。
Definition potrContext.h:48
pthread_mutex_t PotrMutexLocal
porter 内部ログマクロ定義ヘッダー。
#define POTR_LOG(level,...)
porter 内部ログ出力マクロ。
Definition potrLog.h:68
PotrPeerContext * peer_find_by_id(struct PotrContext_ *ctx, PotrPeerId peer_id)
peer_id でピアを検索する。
N:1 モード用ピアテーブル管理モジュールの内部ヘッダー。
int potr_send_queue_push_wait(PotrSendQueue *q, PotrPeerId peer_id, uint16_t flags, const void *payload, uint16_t payload_len, volatile int *running)
void potr_send_queue_wait_drained(PotrSendQueue *q)
非同期送信キューの型定義と操作関数。
#define POTR_MUTEX_LOCK_LOCAL(m)
Definition potrSend.c:29
POTR_EXPORT int POTR_API potrSend(PotrHandle handle, PotrPeerId peer_id, const void *data, size_t len, int flags)
メッセージを送信します。
Definition potrSend.c:88
#define POTR_MUTEX_UNLOCK_LOCAL(m)
Definition potrSend.c:30
static int send_to_peer(struct PotrContext_ *ctx, PotrPeerId peer_id, const uint8_t *ptr, size_t len, int flags, uint16_t base_flags)
Definition potrSend.c:39
セッションコンテキスト構造体。PotrHandle の実体。
volatile int tcp_active_paths
アクティブ TCP path 数 (0 = 全切断)。
PotrGlobalConfig global
グローバル設定。
size_t compress_buf_size
compress_buf のサイズ (バイト)。
PotrSendQueue send_queue
非同期送信キュー。
uint8_t * compress_buf
圧縮・解凍用一時バッファ (動的確保)。
int is_multi_peer
1: N:1 モード (src_addr/src_port 省略), 0: 1:1 モード。
PotrServiceDef service
サービス定義。
PotrMutex peers_mutex
ピアテーブル保護用ミューテックス。
volatile int send_thread_running
送信スレッド実行フラグ (1: 実行中, 0: 停止)。
int max_peers
ピアテーブルサイズ (service.max_peers から取得)。
PotrPeerContext * peers
ピアテーブル (動的確保。max_peers エントリ)。
uint16_t max_payload
最大ペイロード長 (バイト)。
uint32_t max_message_size
1 回の potrSend で送信できる最大メッセージ長 (バイト)。デフォルト: POTR_MAX_MESSAGE_SIZE。
int active
1: 有効スロット, 0: 空き。
PotrPeerId peer_id
外部公開用ピア識別子 (単調増加カウンタから付与)。
int encrypt_enabled
非 0 のとき暗号化有効。設定ファイルに有効な encrypt_key が存在するときに 1 に設定される。
PotrType type
通信種別。
int64_t service_id
サービス ID。