/* * KTCPVS An implementation of the TCP Virtual Server daemon inside * kernel for the LINUX operating system. KTCPVS can be used * to build a moderately scalable and highly available server * based on a cluster of servers, with more flexibility. * * Version: $Id$ * * Authors: Wensong Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version * 2 of the License, or (at your option) any later version. * */ #define __KERNEL_SYSCALLS__ /* for waitpid */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "tcp_vs.h" static int errno; atomic_t tcp_vs_daemon_count; atomic_t tcp_vs_index; rwlock_t __tcp_vs_lock = RW_LOCK_UNLOCKED; EXPORT_SYMBOL(register_tcp_vs_scheduler); EXPORT_SYMBOL(unregister_tcp_vs_scheduler); EXPORT_SYMBOL(tcp_vs_connect2dest); EXPORT_SYMBOL(tcp_vs_sendbuffer); EXPORT_SYMBOL(tcp_vs_sendbuffer_async); EXPORT_SYMBOL(tcp_vs_recvbuffer); EXPORT_SYMBOL(tcp_vs_recvbuffer_async); /* EXPORT_SYMBOL(tcp_vs_lookup_dest); */ EXPORT_SYMBOL(tcp_vs_getword); EXPORT_SYMBOL(tcp_vs_getline); EXPORT_SYMBOL(tcp_vs_exp_exec); #ifdef CONFIG_TCP_VS_DEBUG EXPORT_SYMBOL(tcp_vs_get_debug_level); #endif struct tcp_vs_conn * tcp_vs_conn_create(struct socket *sock, char *buffer, size_t buflen) { struct tcp_vs_conn *conn; conn = kmalloc(sizeof(*conn), GFP_KERNEL); if (!conn) { TCP_VS_ERR("create_conn no memory available\n"); return NULL; } memset(conn, 0, sizeof(*conn)); /* clone the socket */ conn->csock = sock_alloc(); if (!conn->csock) { kfree(conn); return NULL; } conn->csock->type = sock->type; conn->csock->ops = sock->ops; conn->buffer = buffer; conn->buflen = buflen; /* we probably need assign conn->addr here!!! */ return conn; } int tcp_vs_conn_release(struct tcp_vs_conn *conn) { /* release the cloned socket */ sock_release(conn->csock); if (conn->dest) atomic_dec(&conn->dest->conns); kfree(conn); return 0; } /* * Handle TCP connection between client and the tcpvs, and the one * between the tcpvs and the selected server. Terminate until that * the two connections are done. */ int tcp_vs_conn_handle(struct tcp_vs_conn *conn, struct tcp_vs_service *svc) { struct socket *csock, *dsock; char *buffer; size_t buflen; int len; DECLARE_WAITQUEUE(wait1, current); DECLARE_WAITQUEUE(wait2, current); EnterFunction(5); buffer = conn->buffer; buflen = conn->buflen; csock = conn->csock; if ((csock->sk->state != TCP_ESTABLISHED) && (csock->sk->state != TCP_CLOSE_WAIT) ) { TCP_VS_ERR("Error connection not established\n"); return -1; } /* call its scheduler to select a server and create a socket to the destination server */ dsock = svc->scheduler->schedule(conn, svc); if (!dsock) { TCP_VS_ERR("no destination available\n"); return -1; } /* * NOTE: we should add a mechanism to provide higher degree of * fault-tolerance here in the future, if the destination * server is dead, we need replay the request to a * surviving one, and continue to provide the service to * the established connection. * Transaction and Logging? * We need to explore. */ while (1) { /* if the connection is closed, go out of this loop */ if (dsock->sk->state!=TCP_ESTABLISHED && dsock->sk->state!=TCP_CLOSE_WAIT) break; if (csock->sk->state!=TCP_ESTABLISHED && csock->sk->state!=TCP_CLOSE_WAIT) break; /* Do we have data from server? */ if(!skb_queue_empty(&(dsock->sk->receive_queue))) { if ((len=tcp_vs_recvbuffer(dsock, buffer, buflen))<=0) { break; } if (tcp_vs_sendbuffer(csock, buffer, len)!=len) { TCP_VS_ERR("Error sending buffer\n"); } } /* Do we have data from client? */ if(!skb_queue_empty(&(csock->sk->receive_queue))) { if ((len=tcp_vs_recvbuffer(csock, buffer, buflen))<=0) { break; } if (tcp_vs_sendbuffer(dsock, buffer, len)!=len) { TCP_VS_ERR("Error sending buffer\n"); } } if (skb_queue_empty(&(dsock->sk->receive_queue)) && skb_queue_empty(&(csock->sk->receive_queue))) { if (dsock->sk->state == TCP_CLOSE_WAIT || csock->sk->state == TCP_CLOSE_WAIT) break; /* * Put the current task on the sleep wait queue * of both the sockets, wake up the task if one * socket has some data ready. */ add_wait_queue(csock->sk->sleep, &wait1); add_wait_queue(dsock->sk->sleep, &wait2); __set_current_state(TASK_INTERRUPTIBLE); schedule(); __set_current_state(TASK_RUNNING); remove_wait_queue(csock->sk->sleep, &wait1); remove_wait_queue(dsock->sk->sleep, &wait2); } } /* close the socket to the destination */ sock_release(dsock); LeaveFunction(5); return 0; } static int tcp_vs_child(void *param) { struct tcp_vs_conn *conn; struct socket *sock; int error; char *Buffer; size_t BufLen=4096; struct tcp_vs_service *svc = (struct tcp_vs_service *)param; DECLARE_WAIT_QUEUE_HEAD(queue); EnterFunction(3); atomic_inc(&svc->childcount); current->session = 1; current->pgrp = 1; current->state = TASK_INTERRUPTIBLE; sprintf(current->comm,"ktcpvs c %s", svc->ident.name); lock_kernel(); exit_mm(current); /* Block all signals except SIGKILL and SIGSTOP */ spin_lock_irq(¤t->sigmask_lock); siginitsetinv(¤t->blocked, sigmask(SIGKILL) | sigmask(SIGSTOP) ); recalc_sigpending(current); spin_unlock_irq(¤t->sigmask_lock); sock = svc->mainsock; if (sock == NULL) { TCP_VS_ERR("%s's socket is NULL\n", svc->ident.name); return 0; } error=0; Buffer = (char*) get_free_page(GFP_KERNEL); if (Buffer == NULL) return 1; while (svc->stop == 0 && sysctl_ktcpvs_unload == 0) { /* If there are to many children, break out the loop and terminate ??????*/ if (atomic_read(&svc->conns) > svc->conf.maxClients) break; if (sock->sk->tp_pinfo.af_tcp.accept_queue == NULL) { interruptible_sleep_on_timeout(&queue, HZ); continue; } /* create tcp_vs_conn object */ conn = tcp_vs_conn_create(sock, Buffer, BufLen); if (!conn) break; /* Do the actual accept */ error = sock->ops->accept(sock, conn->csock, 0); if (error<0) { TCP_VS_ERR("Error accepting socket\n") ; break; } atomic_inc(&svc->conns); /* Do the work */ error=tcp_vs_conn_handle(conn, svc); if (error<0) { TCP_VS_ERR("Error handling connection\n"); }; /* release tcp_vs_conn */ tcp_vs_conn_release(conn); atomic_dec(&svc->conns); if (signal_pending(current)!=0) { TCP_VS_INFO("Ring Ring - signal received\n"); break; } } free_page((unsigned long)Buffer); atomic_dec(&svc->childcount); LeaveFunction(3); return 0; } static int tcp_vs_daemon(void *param) { int waitpid_result; int i; struct tcp_vs_service *svc = (struct tcp_vs_service *)param; DECLARE_WAIT_QUEUE_HEAD(WQ); atomic_inc(&tcp_vs_daemon_count); sprintf(current->comm,"ktcpvs d %s", svc->ident.name); lock_kernel(); /* This seems to be required for exit_mm */ exit_mm(current); /* Block all signals except SIGKILL and SIGSTOP */ spin_lock_irq(¤t->sigmask_lock); siginitsetinv(¤t->blocked, sigmask(SIGKILL) | sigmask(SIGSTOP) ); recalc_sigpending(current); spin_unlock_irq(¤t->sigmask_lock); if (!svc->scheduler) { TCP_VS_ERR("%s's scheduler is not bound\n", svc->ident.name); svc->start = 0; return 0; } /* Then start listening and spawn the daemons */ if (StartListening(svc) < 0) { svc->start = 0; return 0; } atomic_set(&svc->running, 1); atomic_set(&svc->childcount, 0); svc->stop = 0; for (i=0; iconf.startservers; i++) (void)kernel_thread(tcp_vs_child, svc, 0); /* Then wait for deactivation */ while (svc->stop == 0 && !signal_pending(current) && sysctl_ktcpvs_unload == 0) { /* * We need to implement mechanism to keep the number of * servers in [minSpareServers, maxSpareServers] here. * * More work needed here!!!!!!! */ current->state = TASK_INTERRUPTIBLE; interruptible_sleep_on_timeout(&WQ, HZ); /* reap the daemons */ waitpid_result = waitpid(-1,NULL,__WCLONE|WNOHANG); } /* Wait for tcp_vs_child to stop, one second per iteration */ while (atomic_read(&svc->childcount) > 0) interruptible_sleep_on_timeout(&WQ, HZ); /* reap the zombie-daemons */ waitpid_result = 1; while (waitpid_result>0) waitpid_result = waitpid(-1, NULL,__WCLONE|WNOHANG); /* stop listening */ StopListening(svc); svc->start = 0; atomic_set(&svc->running, 0); atomic_dec(&tcp_vs_daemon_count); return 0; } static int master_daemon(void *unused) { int waitpid_result; struct list_head *l; struct tcp_vs_service *svc; DECLARE_WAIT_QUEUE_HEAD(WQ); MOD_INC_USE_COUNT; sprintf(current->comm,"ktcpvs master"); lock_kernel(); /* This seems to be required for exit_mm */ exit_mm(current); /* Block all signals except SIGKILL and SIGSTOP */ spin_lock_irq(¤t->sigmask_lock); siginitsetinv(¤t->blocked, sigmask(SIGKILL) | sigmask(SIGSTOP) ); recalc_sigpending(current); spin_unlock_irq(¤t->sigmask_lock); /* main loop */ while (sysctl_ktcpvs_unload == 0) { list_for_each (l, &tcp_vs_list) { svc = list_entry(l, struct tcp_vs_service, list); if (!atomic_read(&svc->running) && svc->start) kernel_thread(tcp_vs_daemon, svc, 0); } if (signal_pending(current)) break; current->state = TASK_INTERRUPTIBLE; interruptible_sleep_on_timeout(&WQ, HZ); /* reap the daemons */ waitpid_result = waitpid(-1, NULL, __WCLONE|WNOHANG); } /* Wait for tcp_vs daemons to stop, one second per iteration */ while (atomic_read(&tcp_vs_daemon_count) > 0) interruptible_sleep_on_timeout(&WQ, HZ); /* reap the zombie-daemons */ waitpid_result = 1; while (waitpid_result > 0) waitpid_result = waitpid(-1, NULL, __WCLONE|WNOHANG); /* flush all the virtual servers */ tcp_vs_flush(); TCP_VS_INFO("master daemon stopped. \n" "You can unload the module now.\n"); MOD_DEC_USE_COUNT; return 0; } static int __init ktcpvs_init(void) { atomic_set(&tcp_vs_daemon_count, 0); atomic_set(&tcp_vs_index, 1); tcp_vs_control_start(); (void)kernel_thread(master_daemon, NULL, 0); TCP_VS_INFO("ktcpvs loaded.\n"); return 0; } static void __exit ktcpvs_cleanup(void) { tcp_vs_control_stop(); TCP_VS_INFO("ktcpvs unloaded.\n"); } module_init(ktcpvs_init); module_exit(ktcpvs_cleanup);