Document of c-modernization-kit (porter) 1.0.0
Loading...
Searching...
No Matches
potrSendQueue.c
Go to the documentation of this file.
1
13
14#include <stdlib.h>
15#include <string.h>
16
17#ifndef _WIN32
18 #include <time.h>
19#endif /* _WIN32 */
20
21#include <porter_const.h>
22
23#include "potrSendQueue.h"
24
25/* --------------------------------------------------------------------------
26 * プラットフォーム別 ミューテックス・条件変数 ラッパーマクロ
27 * -------------------------------------------------------------------------- */
28#ifndef _WIN32
29 #define POTR_MUTEX_INIT(m) pthread_mutex_init((m), NULL)
30 #define POTR_MUTEX_LOCK(m) pthread_mutex_lock(m)
31 #define POTR_MUTEX_UNLOCK(m) pthread_mutex_unlock(m)
32 #define POTR_MUTEX_DESTROY(m) pthread_mutex_destroy(m)
33 #define POTR_COND_INIT(c) pthread_cond_init((c), NULL)
34 #define POTR_COND_WAIT(c, m) pthread_cond_wait((c), (m))
35 #define POTR_COND_SIGNAL(c) pthread_cond_signal(c)
36 #define POTR_COND_BROADCAST(c) pthread_cond_broadcast(c)
37 #define POTR_COND_DESTROY(c) pthread_cond_destroy(c)
38#else /* _WIN32 */
39 #define POTR_MUTEX_INIT(m) InitializeCriticalSection(m)
40 #define POTR_MUTEX_LOCK(m) EnterCriticalSection(m)
41 #define POTR_MUTEX_UNLOCK(m) LeaveCriticalSection(m)
42 #define POTR_MUTEX_DESTROY(m) DeleteCriticalSection(m)
43 #define POTR_COND_INIT(c) InitializeConditionVariable(c)
44 #define POTR_COND_WAIT(c, m) SleepConditionVariableCS((c), (m), INFINITE)
45 #define POTR_COND_SIGNAL(c) WakeConditionVariable(c)
46 #define POTR_COND_BROADCAST(c) WakeAllConditionVariable(c)
47 #define POTR_COND_DESTROY(c) ((void)0) /* Windows は破棄不要 */
48#endif /* _WIN32 */
49
50/* doxygen コメントは、ヘッダに記載 */
51int potr_send_queue_init(PotrSendQueue *q, size_t depth, uint16_t max_payload)
52{
53 size_t i;
54
55 memset(q, 0, sizeof(*q));
56
57 q->entries = (PotrPayloadElem *)malloc(depth * sizeof(PotrPayloadElem));
58 q->payload_pool = (uint8_t *)malloc(depth * (size_t)max_payload);
59
60 if (q->entries == NULL || q->payload_pool == NULL)
61 {
62 free(q->entries);
63 free(q->payload_pool);
64 q->entries = NULL;
65 q->payload_pool = NULL;
66 return POTR_ERROR;
67 }
68
69 q->depth = depth;
70
71 for (i = 0; i < depth; i++)
72 {
74 q->entries[i].flags = 0;
75 q->entries[i].payload_len = 0;
76 q->entries[i].payload = q->payload_pool + i * (size_t)max_payload;
77 }
78
83 return POTR_SUCCESS;
84}
85
86/* doxygen コメントは、ヘッダに記載 */
88{
93 free(q->entries);
94 free(q->payload_pool);
95 q->entries = NULL;
96 q->payload_pool = NULL;
97}
98
99/* doxygen コメントは、ヘッダに記載 */
101 uint16_t flags,
102 const void *payload, uint16_t payload_len)
103{
105
106 if (q->count + q->inflight >= q->depth)
107 {
109 return POTR_ERROR;
110 }
111
112 q->entries[q->tail].peer_id = peer_id;
113 q->entries[q->tail].flags = flags;
114 q->entries[q->tail].payload_len = payload_len;
115 memcpy(q->entries[q->tail].payload, payload, payload_len);
116 q->tail = (q->tail + 1U) % q->depth;
117 q->count++;
118
121
122 return POTR_SUCCESS;
123}
124
125/* doxygen コメントは、ヘッダに記載 */
127 uint16_t flags,
128 const void *payload, uint16_t payload_len,
129 volatile int *running)
130{
132
133 /* count + inflight < depth が保証されるまで待機する。
134 inflight エントリもプールスロットを占有するため、count だけでは不足。 */
135 while (q->count + q->inflight >= q->depth)
136 {
137 if (!*running)
138 {
140 return POTR_ERROR;
141 }
142 POTR_COND_WAIT(&q->not_full, &q->mutex);
143 }
144
145 q->entries[q->tail].peer_id = peer_id;
146 q->entries[q->tail].flags = flags;
147 q->entries[q->tail].payload_len = payload_len;
148 memcpy(q->entries[q->tail].payload, payload, payload_len);
149 q->tail = (q->tail + 1U) % q->depth;
150 q->count++;
151
154
155 return POTR_SUCCESS;
156}
157
158/* doxygen コメントは、ヘッダに記載 */
160{
162
163 while (q->count == 0)
164 {
165 if (!*running)
166 {
168 return POTR_ERROR;
169 }
171 }
172
173 *out = q->entries[q->head];
174 q->head = (q->head + 1U) % q->depth;
175 q->count--;
176 q->inflight++;
177
178 /* count + inflight は変化しない (count-- と inflight++ が相殺) ため
179 not_full シグナルは complete() が担う */
181 return POTR_SUCCESS;
182}
183
184/* doxygen コメントは、ヘッダに記載 */
186{
188
189 if (q->count == 0)
190 {
192 return POTR_ERROR;
193 }
194
195 *out = q->entries[q->head]; /* head は送信スレッドのみが変更するので安全 */
196
198 return POTR_SUCCESS;
199}
200
201/* doxygen コメントは、ヘッダに記載 */
203 uint32_t timeout_ms)
204{
206
207 if (q->count == 0)
208 {
209#ifndef _WIN32
210 struct timespec abs_ts;
211 clock_gettime(CLOCK_REALTIME, &abs_ts);
212 abs_ts.tv_sec += (time_t)(timeout_ms / 1000U);
213 abs_ts.tv_nsec += (long)((timeout_ms % 1000U) * 1000000UL);
214 if (abs_ts.tv_nsec >= 1000000000L)
215 {
216 abs_ts.tv_sec++;
217 abs_ts.tv_nsec -= 1000000000L;
218 }
219 pthread_cond_timedwait(&q->not_empty, &q->mutex, &abs_ts);
220#else /* _WIN32 */
221 SleepConditionVariableCS(&q->not_empty, &q->mutex, (DWORD)timeout_ms);
222#endif /* _WIN32 */
223 }
224
225 if (q->count == 0)
226 {
228 return POTR_ERROR;
229 }
230
231 *out = q->entries[q->head];
232
234 return POTR_SUCCESS;
235}
236
237/* doxygen コメントは、ヘッダに記載 */
239{
241
242 if (q->count == 0)
243 {
245 return POTR_ERROR;
246 }
247
248 *out = q->entries[q->head];
249 q->head = (q->head + 1U) % q->depth;
250 q->count--;
251 q->inflight++;
252
254 return POTR_SUCCESS;
255}
256
257/* doxygen コメントは、ヘッダに記載 */
259{
261
262 if (q->inflight > 0U)
263 {
264 q->inflight--;
265 }
266
267 if (q->count == 0U && q->inflight == 0U)
268 {
270 }
271
272 /* inflight 減少により count + inflight < depth となる可能性があるため
273 push_wait で待機中のスレッドを起床させる */
275
277}
278
279/* doxygen コメントは、ヘッダに記載 */
281{
283
284 while (q->count > 0U || q->inflight > 0U)
285 {
286 POTR_COND_WAIT(&q->drained, &q->mutex);
287 }
288
290}
291
292/* doxygen コメントは、ヘッダに記載 */
#define POTR_PEER_NA
ピア ID 未割当を示す予約値。 1:1 モードのコールバックで渡される (ピアの概念がない)。 potrSend() に N:1 モードで指定した場合はエラーを返す。
#define POTR_SUCCESS
成功の戻り値を表す定数。
#define POTR_ERROR
失敗の戻り値を表す定数。
通信ライブラリの定数ファイル。
uint32_t PotrPeerId
ピア識別子。
Definition porter_type.h:32
#define POTR_COND_DESTROY(c)
#define POTR_MUTEX_UNLOCK(m)
int potr_send_queue_init(PotrSendQueue *q, size_t depth, uint16_t max_payload)
#define POTR_MUTEX_LOCK(m)
int potr_send_queue_push(PotrSendQueue *q, PotrPeerId peer_id, uint16_t flags, const void *payload, uint16_t payload_len)
void potr_send_queue_complete(PotrSendQueue *q)
int potr_send_queue_pop(PotrSendQueue *q, PotrPayloadElem *out, volatile int *running)
int potr_send_queue_peek_timed(PotrSendQueue *q, PotrPayloadElem *out, uint32_t timeout_ms)
#define POTR_COND_SIGNAL(c)
#define POTR_COND_INIT(c)
#define POTR_MUTEX_INIT(m)
int potr_send_queue_peek(PotrSendQueue *q, PotrPayloadElem *out)
void potr_send_queue_shutdown(PotrSendQueue *q)
int potr_send_queue_try_pop(PotrSendQueue *q, PotrPayloadElem *out)
#define POTR_COND_WAIT(c, m)
void potr_send_queue_destroy(PotrSendQueue *q)
#define POTR_COND_BROADCAST(c)
int potr_send_queue_push_wait(PotrSendQueue *q, PotrPeerId peer_id, uint16_t flags, const void *payload, uint16_t payload_len, volatile int *running)
#define POTR_MUTEX_DESTROY(m)
void potr_send_queue_wait_drained(PotrSendQueue *q)
非同期送信キューの型定義と操作関数。
送信キューの 1 エントリ。ペイロードエレメント 1 個分のデータを保持する。
uint16_t payload_len
ペイロード長 (バイト)。
uint16_t flags
ペイロードエレメントフラグ (MORE_FRAG, COMPRESSED など)。
uint8_t * payload
ペイロードデータへのポインタ (プールスロット内を指す)。
PotrPeerId peer_id
送信先ピア識別子 (N:1 モード用。1:1 モードでは 0)。
非同期送信キュー。
size_t depth
キュー容量 (エントリ数)。
size_t count
キュー内エントリ数。
size_t inflight
sendto 実行中エントリ数。
PotrMutex mutex
排他制御。
PotrCondVar not_empty
count > 0 になったことを通知する条件変数。
PotrCondVar drained
count == 0 && inflight == 0 を通知する条件変数。
size_t head
読み出し位置 (送信スレッドが使用)。
size_t tail
書き込み位置 (potrSend 呼び出し元が使用)。
PotrCondVar not_full
count + inflight < depth になったことを通知する条件変数。
uint8_t * payload_pool
ペイロードプール (動的確保。depth × max_payload バイト)。
PotrPayloadElem * entries
ペイロードエレメントバッファ (動的確保。depth 要素)。
static volatile sig_atomic_t running