37#define TCPSERVER_PIPE_NAME_FMT "\\\\.\\pipe\\tcpserver_worker_%d"
80 struct addrinfo hints = {0}, *result;
83 sprintf_s(port_str,
sizeof(port_str),
"%d", port);
85 hints.ai_family = AF_INET;
86 hints.ai_socktype = SOCK_STREAM;
87 hints.ai_protocol = IPPROTO_TCP;
88 hints.ai_flags = AI_PASSIVE;
90 if (getaddrinfo(NULL, port_str, &hints, &result) != 0) {
91 fprintf(stderr,
"getaddrinfo 失敗\n");
95 SOCKET listen_socket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
96 if (listen_socket == INVALID_SOCKET) {
97 fprintf(stderr,
"socket 失敗\n");
102 if (bind(listen_socket, result->ai_addr, (
int)result->ai_addrlen) == SOCKET_ERROR) {
103 fprintf(stderr,
"bind 失敗\n");
104 freeaddrinfo(result);
108 freeaddrinfo(result);
110 if (listen(listen_socket, 128) == SOCKET_ERROR) {
111 fprintf(stderr,
"listen 失敗\n");
115 return listen_socket;
126__declspec(noreturn)
static void run_as_fork_child(SOCKET client_socket) {
147static void worker_loop(
const char *pipe_name,
int conns_per_worker) {
148 DWORD bytes_read, bytes_written;
150 printf(
"[ワーカー PID %lu] 起動完了\n", GetCurrentProcessId());
152 HANDLE pipe = CreateFileA(
154 GENERIC_READ | GENERIC_WRITE,
155 0, NULL, OPEN_EXISTING, 0, NULL
158 if (pipe == INVALID_HANDLE_VALUE) {
159 fprintf(stderr,
"[ワーカー] パイプ接続失敗\n");
163 if (conns_per_worker == 1) {
166 SOCKET client_socket;
167 if (!ReadFile(pipe, &client_socket,
sizeof(client_socket), &bytes_read, NULL)) {
170 if (bytes_read !=
sizeof(client_socket)) {
174 printf(
"[ワーカー PID %lu] クライアント接続\n", GetCurrentProcessId());
176 printf(
"[ワーカー PID %lu] 次のソケット待機\n", GetCurrentProcessId());
180 WriteFile(pipe, &done, 1, &bytes_written, NULL);
184 SOCKET *active = (SOCKET *)malloc((
size_t)conns_per_worker *
sizeof(SOCKET));
190 int active_count = 0;
196 if (!PeekNamedPipe(pipe, NULL, 0, NULL, &avail, NULL)) {
199 if (avail >=
sizeof(SOCKET)) {
201 if (ReadFile(pipe, &new_sock,
sizeof(new_sock), &bytes_read, NULL)
202 && bytes_read ==
sizeof(new_sock)) {
203 active[active_count++] = new_sock;
204 printf(
"[ワーカー PID %lu] 新規接続 (計 %d 接続)\n",
205 GetCurrentProcessId(), active_count);
209 if (active_count == 0) {
218 for (
int i = 0; i < active_count; i++) {
219 FD_SET(active[i], &read_fds);
221 struct timeval tv = {0, 10000};
223 select(0, &read_fds, NULL, NULL, &tv);
226 for (
int i = active_count - 1; i >= 0; i--) {
227 if (!FD_ISSET(active[i], &read_fds)) {
233 closesocket(active[i]);
234 active[i] = active[--active_count];
235 printf(
"[ワーカー PID %lu] 接続終了 (残 %d 接続)\n",
236 GetCurrentProcessId(), active_count);
239 WriteFile(pipe, &done, 1, &bytes_written, NULL);
241 send(active[i], buf, n, 0);
250 printf(
"[ワーカー PID %lu] 終了\n", GetCurrentProcessId());
263static unsigned __stdcall worker_monitor_thread(
void *arg) {
264 WorkerMonitorArg *ctx = (WorkerMonitorArg *)arg;
266 WorkerInfo *workers = ctx->workers;
267 HANDLE *events = ctx->events;
268 int conns_per_worker = ctx->conns_per_worker;
273 if (ReadFile(workers[
id].pipe, &done, 1, &bytes_read, NULL)) {
275 workers[id].conn_count--;
276 if (workers[
id].conn_count < conns_per_worker) {
277 SetEvent(events[
id]);
299static int find_available_worker(WorkerInfo *workers, HANDLE *events,
int n,
300 int conns_per_worker) {
302 for (
int i = 0; i < n; i++) {
303 if (workers[i].conn_count < conns_per_worker) {
307 WaitForMultipleObjects((DWORD)n, events, FALSE, INFINITE);
324static void start_prefork_workers(WorkerInfo *workers, HANDLE *events,
int n,
325 int conns_per_worker) {
327 char cmdline[MAX_PATH + 128];
328 char exepath[MAX_PATH];
330 GetModuleFileNameA(NULL, exepath, MAX_PATH);
333 WorkerMonitorArg *args = (WorkerMonitorArg *)malloc((
size_t)n *
sizeof(WorkerMonitorArg));
335 fprintf(stderr,
"malloc 失敗\n");
339 for (
int i = 0; i < n; i++) {
340 sprintf_s(pipe_name,
sizeof(pipe_name), TCPSERVER_PIPE_NAME_FMT, i);
342 workers[i].pipe = CreateNamedPipeA(
345 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
346 1, 4096, 4096, 0, NULL
349 if (workers[i].pipe == INVALID_HANDLE_VALUE) {
350 fprintf(stderr,
"パイプ作成失敗: %d\n", i);
356 events[i] = CreateEvent(NULL, FALSE, TRUE, NULL);
359 sprintf_s(cmdline,
sizeof(cmdline),
"\"%s\" --worker %s --conns-per-worker %d",
360 exepath, pipe_name, conns_per_worker);
362 STARTUPINFOA si = {
sizeof(si)};
363 PROCESS_INFORMATION pi;
365 if (!CreateProcessA(NULL, cmdline, NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi)) {
366 fprintf(stderr,
"ワーカー起動失敗: %d\n", i);
371 workers[i].process = pi.hProcess;
372 workers[i].conn_count = 0;
373 CloseHandle(pi.hThread);
376 ConnectNamedPipe(workers[i].pipe, NULL);
380 args[i].workers = workers;
381 args[i].events = events;
382 args[i].conns_per_worker = conns_per_worker;
383 _beginthreadex(NULL, 0, worker_monitor_thread, (
void *)&args[i], 0, NULL);
385 printf(
"[親プロセス] ワーカー %d (PID %lu) 起動完了\n", i, pi.dwProcessId);
398 if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
399 fprintf(stderr,
"WSAStartup 失敗\n");
412 for (
int i = 1; i < argc - 1; i++) {
413 if (strcmp(argv[i],
"--child") == 0) {
414 SOCKET sock = (SOCKET)_strtoui64(argv[i + 1], NULL, 10);
415 run_as_fork_child(sock);
417 if (strcmp(argv[i],
"--worker") == 0) {
420 for (
int j = i + 2; j < argc - 1; j++) {
421 if (strcmp(argv[j],
"--conns-per-worker") == 0) {
422 conns_per_worker = atoi(argv[j + 1]);
440 char exepath[MAX_PATH];
441 GetModuleFileNameA(NULL, exepath, MAX_PATH);
443 printf(
"[親プロセス %lu] fork モード、ポート %d で待ち受け開始\n",
444 GetCurrentProcessId(), port);
447 SOCKET client_socket = accept(listen_socket, NULL, NULL);
448 if (client_socket == INVALID_SOCKET) {
452 printf(
"[親プロセス] 接続受付、子プロセス生成\n");
455 SetHandleInformation((HANDLE)client_socket, HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT);
457 char cmdline[MAX_PATH + 64];
458 sprintf_s(cmdline,
sizeof(cmdline),
"\"%s\" --child %llu",
459 exepath, (
unsigned long long)client_socket);
461 STARTUPINFOA si = {
sizeof(si)};
462 PROCESS_INFORMATION pi;
464 if (CreateProcessA(NULL, cmdline, NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi)) {
465 CloseHandle(pi.hProcess);
466 CloseHandle(pi.hThread);
469 closesocket(client_socket);
475 WorkerInfo *workers = (WorkerInfo *)malloc((
size_t)num_workers *
sizeof(WorkerInfo));
476 HANDLE *events = (HANDLE *)malloc((
size_t)num_workers *
sizeof(HANDLE));
478 if (!workers || !events) {
479 fprintf(stderr,
"malloc 失敗\n");
484 printf(
"[親プロセス %lu] prefork モード、ワーカー %d 個を起動中 (1 ワーカーあたり最大 %d 接続)...\n",
485 GetCurrentProcessId(), num_workers, conns_per_worker);
486 start_prefork_workers(workers, events, num_workers, conns_per_worker);
487 printf(
"[親プロセス] ポート %d で待ち受け開始\n", port);
490 SOCKET client_socket = accept(listen_socket, NULL, NULL);
491 if (client_socket == INVALID_SOCKET) {
496 SetHandleInformation((HANDLE)client_socket, HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT);
498 int worker_id = find_available_worker(workers, events, num_workers, conns_per_worker);
499 workers[worker_id].conn_count++;
502 if (workers[worker_id].conn_count >= conns_per_worker) {
503 ResetEvent(events[worker_id]);
506 printf(
"[親プロセス] ワーカー %d に接続を割り当て (接続数: %d/%d)\n",
507 worker_id, workers[worker_id].conn_count, conns_per_worker);
510 WriteFile(workers[worker_id].pipe, &client_socket,
sizeof(client_socket),
511 &bytes_written, NULL);
void(* ClientSessionFn)(ClientFd fd)
セッション処理関数の型。
void platform_init(ClientSessionFn session_fn)
プラットフォーム初期化 (Windows: WSAStartup / Linux: no-op)。
int dispatch_internal_args(int argc, char *argv[])
内部起動引数を処理します。
#define DEFAULT_CONNS_PER_WORKER
デフォルト 1 ワーカーあたりの同時接続数。
void platform_cleanup(void)
プラットフォーム後処理 (Windows: WSACleanup / Linux: no-op)。
void run_prefork_server(int port, int num_workers, int conns_per_worker)
prefork モードのサーバーを起動します。
#define BUFFER_SIZE
送受信バッファサイズ (バイト)。
ClientSessionFn g_session_fn
登録済みセッション処理関数。
void run_fork_server(int port)
fork モードのサーバーを起動します。
static void worker_loop(int server_fd, int worker_id, int conns_per_worker)
ワーカープロセスのメインループ (prefork モード用)。
static int create_listen_socket(int port)
listen ソケットを作成してバインドし、待ち受けを開始します。