/* * KTCPVS An implementation of the TCP Virtual Server daemon inside * kernel for the LINUX operating system. KTCPVS can be used * to build a moderately scalable and highly available server * based on a cluster of servers, with more flexibility. * * Version: $Id$ * * Authors: Wensong Zhang * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version * 2 of the License, or (at your option) any later version. * */ #define __KERNEL_SYSCALLS__ /* for waitpid */ #ifdef MODULE #define EXPORT_SYMTAB #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "tcp_vs.h" static int errno; atomic_t tcp_vs_daemon_count; atomic_t tcp_vs_index; struct list_head tcp_vs_list; rwlock_t __tcp_vs_lock = RW_LOCK_UNLOCKED; EXPORT_SYMBOL(register_tcp_vs_scheduler); EXPORT_SYMBOL(unregister_tcp_vs_scheduler); EXPORT_SYMBOL(tcp_vs_connect2dest); EXPORT_SYMBOL(tcp_vs_sendbuffer); EXPORT_SYMBOL(tcp_vs_sendbuffer_async); EXPORT_SYMBOL(tcp_vs_recvbuffer); EXPORT_SYMBOL(tcp_vs_recvbuffer_async); EXPORT_SYMBOL(tcp_vs_lookup_dest); EXPORT_SYMBOL(tcp_vs_getword); EXPORT_SYMBOL(tcp_vs_getline); struct tcp_vs_conn * tcp_vs_conn_create(struct socket *sock, char *buffer, size_t buflen) { struct tcp_vs_conn *conn; conn = kmalloc(sizeof(*conn), GFP_KERNEL); if (!conn) { TCP_VS_ERR("create_conn no memory available\n"); return NULL; } memset(conn, 0, sizeof(*conn)); /* clone the socket */ conn->csock = sock_alloc(); if (!conn->csock) { kfree(conn); return NULL; } conn->csock->type = sock->type; conn->csock->ops = sock->ops; conn->buffer = buffer; conn->buflen = buflen; /* we probably need assign conn->addr here!!! */ return conn; } int tcp_vs_conn_release(struct tcp_vs_conn *conn) { /* release the cloned socket */ sock_release(conn->csock); atomic_dec(&conn->dest->conns); kfree(conn); return 0; } /* * Handle TCP connection between client and the tcpvs, and the one * between the tcpvs and the selected server. Terminate until that * the two connections are done. */ int tcp_vs_conn_handle(struct tcp_vs_conn *conn, struct tcp_vs *vs) { struct socket *csock, *dsock; char *buffer; size_t buflen; int len; DECLARE_WAITQUEUE(wait1, current); DECLARE_WAITQUEUE(wait2, current); EnterFunction("tcp_vs_conn_handle"); buffer = conn->buffer; buflen = conn->buflen; csock = conn->csock; if ((csock->sk->state != TCP_ESTABLISHED) && (csock->sk->state != TCP_CLOSE_WAIT) ) { TCP_VS_ERR("Error connection not established\n"); return -1; } /* call its scheduler to select a server and create a socket to the destination server */ dsock = vs->scheduler->schedule(conn, vs); if (!dsock) { TCP_VS_ERR("no destination available\n"); return -1; } /* * NOTE: we should add a mechanism to provide higher degree of * fault-tolerance here in the future, if the destination * server is dead, we need replay the request to a * surviving one, and continue to provide the service to * the established connection. * Transaction and Logging? * We need to explore. */ while (1) { /* if the connection is closed, go out of this loop */ if (dsock->sk->state!=TCP_ESTABLISHED && dsock->sk->state!=TCP_CLOSE_WAIT) break; if (csock->sk->state!=TCP_ESTABLISHED && csock->sk->state!=TCP_CLOSE_WAIT) break; /* Do we have data from server? */ if(!skb_queue_empty(&(dsock->sk->receive_queue))) { if ((len=tcp_vs_recvbuffer(dsock, buffer, buflen))<=0) { break; } if (tcp_vs_sendbuffer(csock, buffer, len)!=len) { TCP_VS_ERR("Error sending buffer\n"); } } else { if (dsock->sk->state==TCP_CLOSE_WAIT) break; } /* Do we have data from client? */ if(!skb_queue_empty(&(csock->sk->receive_queue))) { if ((len=tcp_vs_recvbuffer(csock, buffer, buflen))<=0) { break; } if (tcp_vs_sendbuffer(dsock, buffer, len)!=len) { TCP_VS_ERR("Error sending buffer\n"); } } else { if (csock->sk->state==TCP_CLOSE_WAIT) break; } /* * Put the current task on the sleep wait queue of both * the sockets, wake up the task if one socket has some * data ready. */ __set_task_state(current, TASK_INTERRUPTIBLE); add_wait_queue(csock->sk->sleep, &wait1); add_wait_queue(dsock->sk->sleep, &wait2); TCP_VS_DBG("sleep for 5 HZs\n"); schedule_timeout(5*HZ); __set_task_state(current, TASK_RUNNING); remove_wait_queue(csock->sk->sleep, &wait1); remove_wait_queue(dsock->sk->sleep, &wait2); } /* close the socket to the destination */ sock_release(dsock); LeaveFunction("tcp_vs_conn_handle"); return 0; } static int tcp_vs_child(void *param) { sigset_t tmpsig; struct tcp_vs_conn *conn; struct socket *sock; int error; char *Buffer; size_t BufLen=4096; struct tcp_vs *vs = (struct tcp_vs *)param; DECLARE_WAIT_QUEUE_HEAD(queue); EnterFunction("tcp_vs_child"); atomic_inc(&vs->childcount); current->session = 1; current->pgrp = 1; current->state |= TASK_EXCLUSIVE; sprintf(current->comm,"ktcpvs c %s", vs->name); lock_kernel(); /* This seems to be required for exit_mm */ exit_mm(current); /* Block all signals except SIGKILL and SIGSTOP */ spin_lock_irq(¤t->sigmask_lock); tmpsig = current->blocked; siginitsetinv(¤t->blocked, sigmask(SIGKILL) | sigmask(SIGSTOP) ); recalc_sigpending(current); spin_unlock_irq(¤t->sigmask_lock); sock = vs->mainsock; if (sock == NULL) { TCP_VS_ERR("%s's socket is NULL\n", vs->name); return 0; } /* sk = sock->sk; */ /* sk->nonagle = 1; */ /* sk->linger = 1; */ error=0; Buffer = (char*) get_free_page(GFP_KERNEL); if (Buffer == NULL) return 1; while (vs->stop == 0) { /* If there are to many children, break out the loop and terminate */ if (atomic_read(&vs->conns) > vs->maxClients) break; if (sock->sk->tp_pinfo.af_tcp.accept_queue == NULL) { interruptible_sleep_on_timeout(&queue,HZ); continue; } /* create tcp_vs_conn object */ conn = tcp_vs_conn_create(sock, Buffer, BufLen); if (!conn) break; /* Do the actual accept */ error = sock->ops->accept(sock, conn->csock, 0); if (error<0) { TCP_VS_ERR("Error accepting socket\n") ; break; } atomic_inc(&vs->conns); /* Do the work */ error=tcp_vs_conn_handle(conn, vs); if (error<0) { TCP_VS_ERR("Error handling connection\n") ; }; /* release tcp_vs_conn */ tcp_vs_conn_release(conn); atomic_dec(&vs->conns); if (signal_pending(current)!=0) { TCP_VS_INFO("Ring Ring - signal received\n"); break; } } free_page((unsigned long)Buffer); atomic_dec(&vs->childcount); LeaveFunction("tcp_vs_child"); return 0; } static int tcp_vs_daemon(void *param) { sigset_t tmpsig; int waitpid_result; int i; struct tcp_vs *vs = (struct tcp_vs *)param; DECLARE_WAIT_QUEUE_HEAD(WQ); atomic_inc(&tcp_vs_daemon_count); sprintf(current->comm,"ktcpvs d %s", vs->name); lock_kernel(); /* This seems to be required for exit_mm */ exit_mm(current); /* Block all signals except SIGKILL and SIGSTOP */ spin_lock_irq(¤t->sigmask_lock); tmpsig = current->blocked; siginitsetinv(¤t->blocked, sigmask(SIGKILL) | sigmask(SIGSTOP) ); recalc_sigpending(current); spin_unlock_irq(¤t->sigmask_lock); if (!vs->scheduler) { TCP_VS_ERR("%s's scheduler is not bound\n", vs->name); vs->start = 0; return 0; } /* Then start listening and spawn the daemons */ if (StartListening(vs)==0) { vs->start = 0; return 0; } atomic_set(&vs->running, 1); atomic_set(&vs->childcount, 0); for (i=0; istartservers; i++) { (void)kernel_thread(tcp_vs_child, vs, 0); } /* Then wait for deactivation */ vs->stop = 0; while (vs->stop==0 && !signal_pending(current)) /* && sysctl_ktcpvs_unload==0 */ { /* * We need to implement mechanism to keep the number of * servers in [minSpareServers, maxSpareServers] here. * * More work needed here!!!!!!! */ current->state = TASK_INTERRUPTIBLE; interruptible_sleep_on_timeout(&WQ, HZ); /* reap the daemons */ waitpid_result = waitpid(-1,NULL,__WCLONE|WNOHANG); } /* Wait for tcp_vs_child to stop, one second per iteration */ while (atomic_read(&vs->childcount) > 0) interruptible_sleep_on_timeout(&WQ, HZ); /* reap the zombie-daemons */ waitpid_result = 1; while (waitpid_result>0) waitpid_result = waitpid(-1, NULL,__WCLONE|WNOHANG); /* stop listening */ StopListening(vs); vs->start = 0; atomic_set(&vs->running, 0); atomic_dec(&tcp_vs_daemon_count); return 0; } static int master_daemon(void *unused) { sigset_t tmpsig; int waitpid_result; struct list_head *l; struct tcp_vs *vs; DECLARE_WAIT_QUEUE_HEAD(WQ); MOD_INC_USE_COUNT; sprintf(current->comm,"ktcpvs master"); lock_kernel(); /* This seems to be required for exit_mm */ exit_mm(current); /* Block all signals except SIGKILL and SIGSTOP */ spin_lock_irq(¤t->sigmask_lock); tmpsig = current->blocked; siginitsetinv(¤t->blocked, sigmask(SIGKILL) | sigmask(SIGSTOP) ); recalc_sigpending(current); spin_unlock_irq(¤t->sigmask_lock); /* main loop */ while (sysctl_ktcpvs_unload==0) { list_for_each (l, &tcp_vs_list) { vs = list_entry(l, struct tcp_vs, list); if (!atomic_read(&vs->running) && vs->start) kernel_thread(tcp_vs_daemon, vs, 0); } if (signal_pending(current)) break; current->state = TASK_INTERRUPTIBLE; interruptible_sleep_on_timeout(&WQ, HZ); /* reap the daemons */ waitpid_result = waitpid(-1, NULL, __WCLONE|WNOHANG); } /* Wait for tcp_vs daemons to stop, one second per iteration */ while (atomic_read(&tcp_vs_daemon_count) > 0) interruptible_sleep_on_timeout(&WQ, HZ); /* reap the zombie-daemons */ waitpid_result = 1; while (waitpid_result>0) waitpid_result = waitpid(-1, NULL, __WCLONE|WNOHANG); /* unbind scheduler */ for (l=&tcp_vs_list; l->next!=&tcp_vs_list; ) { vs = list_entry(l->next, struct tcp_vs, list); tcp_vs_del_virtualserver(vs); } TCP_VS_INFO("master daemon stopped. \n" "You can unload the module now.\n"); MOD_DEC_USE_COUNT; return 0; } #ifdef MODULE int ktcpvs_init(void) #else int __init ktcpvs_init(void) #endif { TCP_VS_INFO("Initializing ktcpvs daemon\n"); atomic_set(&tcp_vs_daemon_count, 0); atomic_set(&tcp_vs_index, 1); INIT_LIST_HEAD(&tcp_vs_list); tcp_vs_control_start(); (void)kernel_thread(master_daemon, NULL, 0); return 0; } int ktcpvs_cleanup(void) { tcp_vs_control_stop(); return 0; } #ifdef MODULE EXPORT_NO_SYMBOLS; int init_module(void) { if(ktcpvs_init() != 0) return -EIO; TCP_VS_INFO("ktcpvs module loaded.\n"); return 0; } void cleanup_module(void) { if(ktcpvs_cleanup() != 0) TCP_VS_INFO("cannot remove ktcpvs module\n"); else TCP_VS_INFO("ktcpvs module unloaded.\n"); } #endif /* MODULE */