/* * KTCPVS An implementation of the TCP Virtual Server daemon inside * kernel for the LINUX operating system. KTCPVS can be used * to build a moderately scalable and highly available server * based on a cluster of servers, with more flexibility. * * Version: $Id$ * * Authors: Wensong Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version * 2 of the License, or (at your option) any later version. * */ #define __KERNEL_SYSCALLS__ /* for waitpid */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "tcp_vs.h" static int errno; static atomic_t tcp_vs_daemon_count = ATOMIC_INIT(0); EXPORT_SYMBOL(register_tcp_vs_scheduler); EXPORT_SYMBOL(unregister_tcp_vs_scheduler); EXPORT_SYMBOL(tcp_vs_connect2dest); EXPORT_SYMBOL(tcp_vs_sendbuffer); EXPORT_SYMBOL(tcp_vs_sendbuffer_async); EXPORT_SYMBOL(tcp_vs_recvbuffer); EXPORT_SYMBOL(tcp_vs_recvbuffer_async); /* EXPORT_SYMBOL(tcp_vs_lookup_dest); */ EXPORT_SYMBOL(tcp_vs_getword); EXPORT_SYMBOL(tcp_vs_getline); EXPORT_SYMBOL(tcp_vs_exp_exec); #ifdef CONFIG_TCP_VS_DEBUG EXPORT_SYMBOL(tcp_vs_get_debug_level); #endif struct tcp_vs_conn * tcp_vs_conn_create(struct socket *sock, char *buffer, size_t buflen) { struct tcp_vs_conn *conn; conn = kmalloc(sizeof(*conn), GFP_KERNEL); if (!conn) { TCP_VS_ERR("create_conn no memory available\n"); return NULL; } memset(conn, 0, sizeof(*conn)); /* clone the socket */ conn->csock = sock_alloc(); if (!conn->csock) { kfree(conn); return NULL; } conn->csock->type = sock->type; conn->csock->ops = sock->ops; conn->buffer = buffer; conn->buflen = buflen; /* we probably need assign conn->addr here!!! */ return conn; } int tcp_vs_conn_release(struct tcp_vs_conn *conn) { /* release the cloned socket */ sock_release(conn->csock); if (conn->dest) atomic_dec(&conn->dest->conns); kfree(conn); return 0; } /* * Handle TCP connection between client and the tcpvs, and the one * between the tcpvs and the selected server. Terminate until that * the two connections are done. */ int tcp_vs_conn_handle(struct tcp_vs_conn *conn, struct tcp_vs_service *svc) { struct socket *csock, *dsock; char *buffer; size_t buflen; int len; DECLARE_WAITQUEUE(wait1, current); DECLARE_WAITQUEUE(wait2, current); EnterFunction(5); buffer = conn->buffer; buflen = conn->buflen; csock = conn->csock; if ((csock->sk->state != TCP_ESTABLISHED) && (csock->sk->state != TCP_CLOSE_WAIT) ) { TCP_VS_ERR("Error connection not established\n"); return -1; } /* call its scheduler to select a server and create a socket to the destination server */ dsock = svc->scheduler->schedule(conn, svc); if (!dsock) { TCP_VS_ERR_RL("no destination available\n"); return -1; } /* * NOTE: we should add a mechanism to provide higher degree of * fault-tolerance here in the future, if the destination * server is dead, we need replay the request to a * surviving one, and continue to provide the service to * the established connection. * Transaction and Logging? * We need to explore. */ while (1) { /* if the connection is closed, go out of this loop */ if (dsock->sk->state!=TCP_ESTABLISHED && dsock->sk->state!=TCP_CLOSE_WAIT) break; if (csock->sk->state!=TCP_ESTABLISHED && csock->sk->state!=TCP_CLOSE_WAIT) break; /* Do we have data from server? */ if(!skb_queue_empty(&(dsock->sk->receive_queue))) { if ((len=tcp_vs_recvbuffer(dsock, buffer, buflen))<=0) { break; } if (tcp_vs_sendbuffer(csock, buffer, len) != len) { TCP_VS_ERR_RL("Error sending buffer\n"); } } /* Do we have data from client? */ if(!skb_queue_empty(&(csock->sk->receive_queue))) { if ((len=tcp_vs_recvbuffer(csock, buffer, buflen))<=0) { break; } if (tcp_vs_sendbuffer(dsock, buffer, len)!=len) { TCP_VS_ERR_RL("Error sending buffer\n"); } } if (skb_queue_empty(&(dsock->sk->receive_queue)) && skb_queue_empty(&(csock->sk->receive_queue))) { if (dsock->sk->state == TCP_CLOSE_WAIT || csock->sk->state == TCP_CLOSE_WAIT) break; /* * Put the current task on the sleep wait queue * of both the sockets, wake up the task if one * socket has some data ready. */ add_wait_queue(csock->sk->sleep, &wait1); add_wait_queue(dsock->sk->sleep, &wait2); __set_current_state(TASK_INTERRUPTIBLE); schedule(); __set_current_state(TASK_RUNNING); remove_wait_queue(csock->sk->sleep, &wait1); remove_wait_queue(dsock->sk->sleep, &wait2); } } /* close the socket to the destination */ sock_release(dsock); LeaveFunction(5); return 0; } enum { SERVER_DEAD = 0, SERVER_STARTING, SERVER_READY, SERVER_BUSY }; #ifndef MAX_SPAWN_RATE #define MAX_SPAWN_RATE 32 #endif struct tcp_vs_child_table { struct tcp_vs_child children[KTCPVS_CHILD_HARD_LIMIT]; int max_daemons_limit; int idle_spawn_rate; unsigned long last_modified; /* last time of add/killing child */ }; static int tcp_vs_child(void *__child); static inline void make_child(struct tcp_vs_child_table *tbl, int slot, struct tcp_vs_service *svc) { if (slot + 1 > tbl->max_daemons_limit) tbl->max_daemons_limit = slot + 1; tbl->last_modified = jiffies; tbl->children[slot].svc = svc; if (kernel_thread(tcp_vs_child, &tbl->children[slot], 0) < 0) TCP_VS_ERR("spawn child failed\n"); } static inline void kill_child(struct tcp_vs_child_table *tbl, int slot) { kill_proc(tbl->children[slot].pid, SIGKILL, 1); tbl->last_modified = jiffies; } static inline void update_child_status(struct tcp_vs_child *chd, __u16 status) { chd->status = status; } static inline void child_pool_maintenance(struct tcp_vs_child_table *tbl, struct tcp_vs_service *svc) { int i; int free_slots[MAX_SPAWN_RATE]; int free_length = 0; int to_kill = -1; int idle_count = 0; int last_non_dead = -1; for (i = 0; i < svc->conf.maxClients; i++) { int status; if (i >= tbl->max_daemons_limit && free_length == tbl->idle_spawn_rate) break; status = tbl->children[i].status; switch (status) { case SERVER_DEAD: if (free_length < tbl->idle_spawn_rate) { free_slots[free_length] = i; free_length++; } break; case SERVER_STARTING: case SERVER_BUSY: last_non_dead = i; break; case SERVER_READY: idle_count++; to_kill = i; last_non_dead = i; break; } } tbl->max_daemons_limit = last_non_dead + 1; if (idle_count > svc->conf.maxSpareServers) { /* kill one child each time */ kill_child(tbl, to_kill); tbl->idle_spawn_rate = 1; } else if (idle_count < svc->conf.minSpareServers) { if (free_length) { if (tbl->idle_spawn_rate > 8 && net_ratelimit()) TCP_VS_INFO("Server %s seems busy, you may " "need to increase StartServers, " "or Min/MaxSpareServers\n", svc->ident.name); /* spawn a batch of children */ for (i = 0; i < free_length; i++) make_child(tbl, free_slots[i], svc); if (tbl->idle_spawn_rate < MAX_SPAWN_RATE) tbl->idle_spawn_rate *= 2; } else if (net_ratelimit()) TCP_VS_INFO("Server %s reached MaxClients setting, " "consider raising the MaxClients " "setting\n", svc->ident.name); } else { /* if the number of spare servers remains in the interval (minSpareServers, maxSpareServers] and the time of last modified is larger than ten minutes, we try to kill one spare child in order to release some resource. */ if (idle_count > svc->conf.minSpareServers && jiffies - tbl->last_modified > 600*HZ) kill_child(tbl, to_kill); tbl->idle_spawn_rate = 1; } } static int tcp_vs_child(void *__child) { struct tcp_vs_conn *conn; struct socket *sock; int ret = 0; char *Buffer; size_t BufLen=4096; struct tcp_vs_child *chd = (struct tcp_vs_child *)__child; struct tcp_vs_service *svc = chd->svc; DECLARE_WAIT_QUEUE_HEAD(queue); EnterFunction(3); atomic_inc(&svc->childcount); chd->pid = current->pid; update_child_status(chd, SERVER_STARTING); sprintf(current->comm,"ktcpvs c %s", svc->ident.name); lock_kernel(); daemonize(); /* Block all signals except SIGKILL and SIGSTOP */ spin_lock_irq(¤t->sigmask_lock); siginitsetinv(¤t->blocked, sigmask(SIGKILL) | sigmask(SIGSTOP)); recalc_sigpending(current); spin_unlock_irq(¤t->sigmask_lock); sock = svc->mainsock; if (sock == NULL) { TCP_VS_ERR("%s's socket is NULL\n", svc->ident.name); ret = -1; goto out; } Buffer = (char*) get_free_page(GFP_KERNEL); if (Buffer == NULL) { ret = -ENOMEM; goto out; } while (svc->stop == 0 && sysctl_ktcpvs_unload == 0) { if (signal_pending(current)) { TCP_VS_DBG(3, "child (pid=%d): signal received\n", current->pid); break; } if (sock->sk->tp_pinfo.af_tcp.accept_queue == NULL) { update_child_status(chd, SERVER_READY); interruptible_sleep_on_timeout(&queue, HZ); continue; } update_child_status(chd, SERVER_BUSY); /* create tcp_vs_conn object */ conn = tcp_vs_conn_create(sock, Buffer, BufLen); if (!conn) break; /* Do the actual accept */ ret = sock->ops->accept(sock, conn->csock, 0); if (ret < 0) { TCP_VS_ERR("Error accepting socket\n") ; tcp_vs_conn_release(conn); break; } atomic_inc(&svc->conns); /* Do the work */ ret = tcp_vs_conn_handle(conn, svc); if (ret < 0) { TCP_VS_ERR_RL("Error handling connection\n"); tcp_vs_conn_release(conn); atomic_dec(&svc->conns); break; }; /* release tcp_vs_conn */ tcp_vs_conn_release(conn); atomic_dec(&svc->conns); } free_page((unsigned long)Buffer); out: update_child_status(chd, SERVER_DEAD); atomic_dec(&svc->childcount); LeaveFunction(3); return 0; } static int tcp_vs_daemon(void *__svc) { int waitpid_result; int i; struct tcp_vs_service *svc = (struct tcp_vs_service *)__svc; struct tcp_vs_child_table *child_table; DECLARE_WAIT_QUEUE_HEAD(WQ); atomic_inc(&tcp_vs_daemon_count); sprintf(current->comm,"ktcpvs d %s", svc->ident.name); lock_kernel(); daemonize(); /* Block all signals except SIGKILL and SIGSTOP */ spin_lock_irq(¤t->sigmask_lock); siginitsetinv(¤t->blocked, sigmask(SIGKILL) | sigmask(SIGSTOP) ); recalc_sigpending(current); spin_unlock_irq(¤t->sigmask_lock); if (!svc->scheduler) { TCP_VS_ERR("%s's scheduler is not bound\n", svc->ident.name); goto out; } child_table = vmalloc(sizeof(*child_table)); if (!child_table) goto out; /* Then start listening and spawn the daemons */ if (StartListening(svc) < 0) goto out; atomic_set(&svc->running, 1); atomic_set(&svc->childcount, 0); svc->stop = 0; memset(child_table, 0, sizeof(*child_table)); child_table->idle_spawn_rate = 1; for (i=0; iconf.startservers; i++) make_child(child_table, i, svc); /* Then wait for deactivation */ while (svc->stop == 0 && !signal_pending(current) && sysctl_ktcpvs_unload == 0) { current->state = TASK_INTERRUPTIBLE; interruptible_sleep_on_timeout(&WQ, HZ); /* dynamically keep enough thread to handle load */ child_pool_maintenance(child_table, svc); /* reap the daemons */ waitpid_result = waitpid(-1, NULL, __WCLONE|WNOHANG); } /* Wait for tcp_vs_child to stop, one second per iteration */ while (atomic_read(&svc->childcount) > 0) interruptible_sleep_on_timeout(&WQ, HZ); /* reap the zombie-daemons */ waitpid_result = 1; while (waitpid_result>0) waitpid_result = waitpid(-1, NULL, __WCLONE|WNOHANG); /* stop listening */ StopListening(svc); out: svc->start = 0; atomic_set(&svc->running, 0); atomic_dec(&tcp_vs_daemon_count); return 0; } static int master_daemon(void *unused) { int waitpid_result; struct list_head *l; struct tcp_vs_service *svc; DECLARE_WAIT_QUEUE_HEAD(WQ); MOD_INC_USE_COUNT; sprintf(current->comm,"ktcpvs master"); lock_kernel(); /* This seems to be required for exit_mm */ exit_mm(current); /* Block all signals except SIGKILL and SIGSTOP */ spin_lock_irq(¤t->sigmask_lock); siginitsetinv(¤t->blocked, sigmask(SIGKILL) | sigmask(SIGSTOP) ); recalc_sigpending(current); spin_unlock_irq(¤t->sigmask_lock); /* main loop */ while (sysctl_ktcpvs_unload == 0) { read_lock(&__tcp_vs_svc_lock); list_for_each (l, &tcp_vs_svc_list) { svc = list_entry(l, struct tcp_vs_service, list); if (!atomic_read(&svc->running) && svc->start) kernel_thread(tcp_vs_daemon, svc, 0); } read_unlock(&__tcp_vs_svc_lock); if (signal_pending(current)) break; current->state = TASK_INTERRUPTIBLE; interruptible_sleep_on_timeout(&WQ, HZ); /* reap the daemons */ waitpid_result = waitpid(-1, NULL, __WCLONE|WNOHANG); } /* Wait for tcp_vs daemons to stop, one second per iteration */ while (atomic_read(&tcp_vs_daemon_count) > 0) interruptible_sleep_on_timeout(&WQ, HZ); /* reap the zombie-daemons */ waitpid_result = 1; while (waitpid_result > 0) waitpid_result = waitpid(-1, NULL, __WCLONE|WNOHANG); /* flush all the virtual servers */ tcp_vs_flush(); TCP_VS_INFO("The master daemon stopped. \n" "You can unload the module now.\n"); MOD_DEC_USE_COUNT; return 0; } static int __init ktcpvs_init(void) { tcp_vs_control_start(); (void)kernel_thread(master_daemon, NULL, 0); TCP_VS_INFO("ktcpvs loaded.\n"); return 0; } static void __exit ktcpvs_cleanup(void) { tcp_vs_control_stop(); TCP_VS_INFO("ktcpvs unloaded.\n"); } module_init(ktcpvs_init); module_exit(ktcpvs_cleanup);