Document of c-modernization-kit (porter) 1.0.0
Loading...
Searching...
No Matches
tcpServer_windows.c
Go to the documentation of this file.
1
26
27#ifdef _WIN32
28
29#include "tcpServer.h" /* WIN32_LEAN_AND_MEAN / windows.h / winsock2.h / ws2tcpip.h を内包 */
30
31#include <stdio.h>
32#include <stdlib.h>
33#include <string.h>
34#include <process.h> /* _beginthreadex */
35
37#define TCPSERVER_PIPE_NAME_FMT "\\\\.\\pipe\\tcpserver_worker_%d"
38
39/* ============================================================
40 * 内部型定義
41 * ============================================================ */
42
48typedef struct {
49 HANDLE pipe;
50 HANDLE process;
51 int conn_count;
52} WorkerInfo;
53
59typedef struct {
60 int id;
61 WorkerInfo *workers;
62 HANDLE *events;
63 int conns_per_worker;
64} WorkerMonitorArg;
65
66/* ============================================================
67 * 内部ヘルパー
68 * ============================================================ */
69
79static SOCKET create_listen_socket(int port) {
80 struct addrinfo hints = {0}, *result;
81 char port_str[8];
82
83 sprintf_s(port_str, sizeof(port_str), "%d", port);
84
85 hints.ai_family = AF_INET;
86 hints.ai_socktype = SOCK_STREAM;
87 hints.ai_protocol = IPPROTO_TCP;
88 hints.ai_flags = AI_PASSIVE;
89
90 if (getaddrinfo(NULL, port_str, &hints, &result) != 0) {
91 fprintf(stderr, "getaddrinfo 失敗\n");
92 exit(1);
93 }
94
95 SOCKET listen_socket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
96 if (listen_socket == INVALID_SOCKET) {
97 fprintf(stderr, "socket 失敗\n");
98 freeaddrinfo(result);
99 exit(1);
100 }
101
102 if (bind(listen_socket, result->ai_addr, (int)result->ai_addrlen) == SOCKET_ERROR) {
103 fprintf(stderr, "bind 失敗\n");
104 freeaddrinfo(result);
105 exit(1);
106 }
107
108 freeaddrinfo(result);
109
110 if (listen(listen_socket, 128) == SOCKET_ERROR) {
111 fprintf(stderr, "listen 失敗\n");
112 exit(1);
113 }
114
115 return listen_socket;
116}
117
126__declspec(noreturn) static void run_as_fork_child(SOCKET client_socket) {
127 g_session_fn(client_socket);
129 ExitProcess(0);
130}
131
147static void worker_loop(const char *pipe_name, int conns_per_worker) {
148 DWORD bytes_read, bytes_written;
149
150 printf("[ワーカー PID %lu] 起動完了\n", GetCurrentProcessId());
151
152 HANDLE pipe = CreateFileA(
153 pipe_name,
154 GENERIC_READ | GENERIC_WRITE,
155 0, NULL, OPEN_EXISTING, 0, NULL
156 );
157
158 if (pipe == INVALID_HANDLE_VALUE) {
159 fprintf(stderr, "[ワーカー] パイプ接続失敗\n");
160 return;
161 }
162
163 if (conns_per_worker == 1) {
164 /* --- 従来の逐次処理 --- */
165 while (1) {
166 SOCKET client_socket;
167 if (!ReadFile(pipe, &client_socket, sizeof(client_socket), &bytes_read, NULL)) {
168 break;
169 }
170 if (bytes_read != sizeof(client_socket)) {
171 break;
172 }
173
174 printf("[ワーカー PID %lu] クライアント接続\n", GetCurrentProcessId());
175 g_session_fn(client_socket);
176 printf("[ワーカー PID %lu] 次のソケット待機\n", GetCurrentProcessId());
177
178 /* 親に処理完了を通知 */
179 char done = 1;
180 WriteFile(pipe, &done, 1, &bytes_written, NULL);
181 }
182 } else {
183 /* --- PeekNamedPipe + select による多重接続処理 --- */
184 SOCKET *active = (SOCKET *)malloc((size_t)conns_per_worker * sizeof(SOCKET));
185 if (!active) {
186 CloseHandle(pipe);
187 return;
188 }
189
190 int active_count = 0;
191 char buf[BUFFER_SIZE];
192
193 while (1) {
194 /* パイプから新規ソケットを非ブロッキングチェック */
195 DWORD avail = 0;
196 if (!PeekNamedPipe(pipe, NULL, 0, NULL, &avail, NULL)) {
197 break; /* パイプ切断 */
198 }
199 if (avail >= sizeof(SOCKET)) {
200 SOCKET new_sock;
201 if (ReadFile(pipe, &new_sock, sizeof(new_sock), &bytes_read, NULL)
202 && bytes_read == sizeof(new_sock)) {
203 active[active_count++] = new_sock;
204 printf("[ワーカー PID %lu] 新規接続 (計 %d 接続)\n",
205 GetCurrentProcessId(), active_count);
206 }
207 }
208
209 if (active_count == 0) {
210 /* アクティブな接続がない場合は少し待ってパイプを再チェック */
211 Sleep(10);
212 continue;
213 }
214
215 /* select で全アクティブソケットを監視 (10ms タイムアウト) */
216 fd_set read_fds;
217 FD_ZERO(&read_fds);
218 for (int i = 0; i < active_count; i++) {
219 FD_SET(active[i], &read_fds);
220 }
221 struct timeval tv = {0, 10000}; /* 10ms */
222 /* Windows では select の第 1 引数は無視される */
223 select(0, &read_fds, NULL, NULL, &tv);
224
225 /* 末尾から走査して配列詰めを安全に行う */
226 for (int i = active_count - 1; i >= 0; i--) {
227 if (!FD_ISSET(active[i], &read_fds)) {
228 continue;
229 }
230
231 int n = recv(active[i], buf, BUFFER_SIZE, 0);
232 if (n <= 0) {
233 closesocket(active[i]);
234 active[i] = active[--active_count]; /* 末尾と入れ替えて削除 */
235 printf("[ワーカー PID %lu] 接続終了 (残 %d 接続)\n",
236 GetCurrentProcessId(), active_count);
237 /* 親に接続 1 本分の完了を通知 */
238 char done = 1;
239 WriteFile(pipe, &done, 1, &bytes_written, NULL);
240 } else {
241 send(active[i], buf, n, 0);
242 }
243 }
244 }
245
246 free(active);
247 }
248
249 CloseHandle(pipe);
250 printf("[ワーカー PID %lu] 終了\n", GetCurrentProcessId());
251}
252
263static unsigned __stdcall worker_monitor_thread(void *arg) {
264 WorkerMonitorArg *ctx = (WorkerMonitorArg *)arg;
265 int id = ctx->id;
266 WorkerInfo *workers = ctx->workers;
267 HANDLE *events = ctx->events;
268 int conns_per_worker = ctx->conns_per_worker;
269 char done;
270 DWORD bytes_read;
271
272 while (1) {
273 if (ReadFile(workers[id].pipe, &done, 1, &bytes_read, NULL)) {
274 /* 接続 1 本分の完了通知 → conn_count を減らし、空きができたらイベントをシグナル */
275 workers[id].conn_count--;
276 if (workers[id].conn_count < conns_per_worker) {
277 SetEvent(events[id]);
278 }
279 } else {
280 break;
281 }
282 }
283 return 0;
284}
285
299static int find_available_worker(WorkerInfo *workers, HANDLE *events, int n,
300 int conns_per_worker) {
301 while (1) {
302 for (int i = 0; i < n; i++) {
303 if (workers[i].conn_count < conns_per_worker) {
304 return i;
305 }
306 }
307 WaitForMultipleObjects((DWORD)n, events, FALSE, INFINITE);
308 }
309}
310
324static void start_prefork_workers(WorkerInfo *workers, HANDLE *events, int n,
325 int conns_per_worker) {
326 char pipe_name[64];
327 char cmdline[MAX_PATH + 128];
328 char exepath[MAX_PATH];
329
330 GetModuleFileNameA(NULL, exepath, MAX_PATH);
331
332 /* 監視スレッド引数はプロセス終了まで有効である必要があるため heap 確保 */
333 WorkerMonitorArg *args = (WorkerMonitorArg *)malloc((size_t)n * sizeof(WorkerMonitorArg));
334 if (!args) {
335 fprintf(stderr, "malloc 失敗\n");
336 exit(1);
337 }
338
339 for (int i = 0; i < n; i++) {
340 sprintf_s(pipe_name, sizeof(pipe_name), TCPSERVER_PIPE_NAME_FMT, i);
341
342 workers[i].pipe = CreateNamedPipeA(
343 pipe_name,
344 PIPE_ACCESS_DUPLEX,
345 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
346 1, 4096, 4096, 0, NULL
347 );
348
349 if (workers[i].pipe == INVALID_HANDLE_VALUE) {
350 fprintf(stderr, "パイプ作成失敗: %d\n", i);
351 free(args);
352 exit(1);
353 }
354
355 /* イベント作成 (初期状態: 空き) */
356 events[i] = CreateEvent(NULL, FALSE, TRUE, NULL);
357
358 /* ワーカープロセス起動 (--conns-per-worker を渡す) */
359 sprintf_s(cmdline, sizeof(cmdline), "\"%s\" --worker %s --conns-per-worker %d",
360 exepath, pipe_name, conns_per_worker);
361
362 STARTUPINFOA si = {sizeof(si)};
363 PROCESS_INFORMATION pi;
364
365 if (!CreateProcessA(NULL, cmdline, NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi)) {
366 fprintf(stderr, "ワーカー起動失敗: %d\n", i);
367 free(args);
368 exit(1);
369 }
370
371 workers[i].process = pi.hProcess;
372 workers[i].conn_count = 0;
373 CloseHandle(pi.hThread);
374
375 /* ワーカーがパイプに接続するのを待つ */
376 ConnectNamedPipe(workers[i].pipe, NULL);
377
378 /* 監視スレッド起動 */
379 args[i].id = i;
380 args[i].workers = workers;
381 args[i].events = events;
382 args[i].conns_per_worker = conns_per_worker;
383 _beginthreadex(NULL, 0, worker_monitor_thread, (void *)&args[i], 0, NULL);
384
385 printf("[親プロセス] ワーカー %d (PID %lu) 起動完了\n", i, pi.dwProcessId);
386 }
387
388 /* args はスレッドが使い続けるため解放しない (プロセス終了時に解放される) */
389}
390
391/* ============================================================
392 * プラットフォームフック
393 * ============================================================ */
394
395/* doxygen コメントは、ヘッダに記載 */
396void platform_init(ClientSessionFn session_fn) {
397 WSADATA wsaData;
398 if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
399 fprintf(stderr, "WSAStartup 失敗\n");
400 exit(1);
401 }
402 g_session_fn = session_fn;
403}
404
405/* doxygen コメントは、ヘッダに記載 */
406void platform_cleanup(void) {
407 WSACleanup();
408}
409
410/* doxygen コメントは、ヘッダに記載 */
411int dispatch_internal_args(int argc, char *argv[]) {
412 for (int i = 1; i < argc - 1; i++) {
413 if (strcmp(argv[i], "--child") == 0) {
414 SOCKET sock = (SOCKET)_strtoui64(argv[i + 1], NULL, 10);
415 run_as_fork_child(sock);
416 }
417 if (strcmp(argv[i], "--worker") == 0) {
418 /* --conns-per-worker が続く場合は読み取る */
419 int conns_per_worker = DEFAULT_CONNS_PER_WORKER;
420 for (int j = i + 2; j < argc - 1; j++) {
421 if (strcmp(argv[j], "--conns-per-worker") == 0) {
422 conns_per_worker = atoi(argv[j + 1]);
423 break;
424 }
425 }
426 worker_loop(argv[i + 1], conns_per_worker);
427 return 1;
428 }
429 }
430 return 0;
431}
432
433/* ============================================================
434 * サーバー実装
435 * ============================================================ */
436
437/* doxygen コメントは、ヘッダに記載 */
438void run_fork_server(int port) {
439 SOCKET listen_socket = create_listen_socket(port);
440 char exepath[MAX_PATH];
441 GetModuleFileNameA(NULL, exepath, MAX_PATH);
442
443 printf("[親プロセス %lu] fork モード、ポート %d で待ち受け開始\n",
444 GetCurrentProcessId(), port);
445
446 while (1) {
447 SOCKET client_socket = accept(listen_socket, NULL, NULL);
448 if (client_socket == INVALID_SOCKET) {
449 continue;
450 }
451
452 printf("[親プロセス] 接続受付、子プロセス生成\n");
453
454 /* ソケットハンドルを継承可能に設定 */
455 SetHandleInformation((HANDLE)client_socket, HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT);
456
457 char cmdline[MAX_PATH + 64];
458 sprintf_s(cmdline, sizeof(cmdline), "\"%s\" --child %llu",
459 exepath, (unsigned long long)client_socket);
460
461 STARTUPINFOA si = {sizeof(si)};
462 PROCESS_INFORMATION pi;
463
464 if (CreateProcessA(NULL, cmdline, NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi)) {
465 CloseHandle(pi.hProcess);
466 CloseHandle(pi.hThread);
467 }
468
469 closesocket(client_socket);
470 }
471}
472
473/* doxygen コメントは、ヘッダに記載 */
474void run_prefork_server(int port, int num_workers, int conns_per_worker) {
475 WorkerInfo *workers = (WorkerInfo *)malloc((size_t)num_workers * sizeof(WorkerInfo));
476 HANDLE *events = (HANDLE *)malloc((size_t)num_workers * sizeof(HANDLE));
477
478 if (!workers || !events) {
479 fprintf(stderr, "malloc 失敗\n");
480 exit(1);
481 }
482
483 SOCKET listen_socket = create_listen_socket(port);
484 printf("[親プロセス %lu] prefork モード、ワーカー %d 個を起動中 (1 ワーカーあたり最大 %d 接続)...\n",
485 GetCurrentProcessId(), num_workers, conns_per_worker);
486 start_prefork_workers(workers, events, num_workers, conns_per_worker);
487 printf("[親プロセス] ポート %d で待ち受け開始\n", port);
488
489 while (1) {
490 SOCKET client_socket = accept(listen_socket, NULL, NULL);
491 if (client_socket == INVALID_SOCKET) {
492 continue;
493 }
494
495 /* ソケットを継承可能に */
496 SetHandleInformation((HANDLE)client_socket, HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT);
497
498 int worker_id = find_available_worker(workers, events, num_workers, conns_per_worker);
499 workers[worker_id].conn_count++;
500
501 /* 容量に達した場合はイベントを非シグナルにして次の割り当てを防ぐ */
502 if (workers[worker_id].conn_count >= conns_per_worker) {
503 ResetEvent(events[worker_id]);
504 }
505
506 printf("[親プロセス] ワーカー %d に接続を割り当て (接続数: %d/%d)\n",
507 worker_id, workers[worker_id].conn_count, conns_per_worker);
508
509 DWORD bytes_written;
510 WriteFile(workers[worker_id].pipe, &client_socket, sizeof(client_socket),
511 &bytes_written, NULL);
512
513 /* ソケットはワーカーが処理完了後に closesocket する */
514 }
515
516 /* 到達しないが形式上記述 */
517 free(workers);
518 free(events);
519}
520
521#endif /* _WIN32 */
TCP サーバーサンプル共通定義。
void(* ClientSessionFn)(ClientFd fd)
セッション処理関数の型。
Definition tcpServer.h:99
void platform_init(ClientSessionFn session_fn)
プラットフォーム初期化 (Windows: WSAStartup / Linux: no-op)。
int dispatch_internal_args(int argc, char *argv[])
内部起動引数を処理します。
#define DEFAULT_CONNS_PER_WORKER
デフォルト 1 ワーカーあたりの同時接続数。
Definition tcpServer.h:74
void platform_cleanup(void)
プラットフォーム後処理 (Windows: WSACleanup / Linux: no-op)。
void run_prefork_server(int port, int num_workers, int conns_per_worker)
prefork モードのサーバーを起動します。
#define BUFFER_SIZE
送受信バッファサイズ (バイト)。
Definition tcpServer.h:76
ClientSessionFn g_session_fn
登録済みセッション処理関数。
void run_fork_server(int port)
fork モードのサーバーを起動します。
static void worker_loop(int server_fd, int worker_id, int conns_per_worker)
ワーカープロセスのメインループ (prefork モード用)。
static int create_listen_socket(int port)
listen ソケットを作成してバインドし、待ち受けを開始します。