Friday, March 9, 2007

Simple Socket Programming 2. Revenge of the chat.

See Part 1.

Chat is a much more interesting server application than webserver. Each thread of control must communicate with every other thread. Or, at least, with some sort of authority that communicates with every other thread.

I'm going to be explicit about where declarations come from. It's a lot easier to sort out where to look for documentation with this style of import.



> import Control.Concurrent (forkIO)
> import Control.Concurrent.STM (STM, TVar, atomically, newTVar, readTVar, writeTVar)
> import Control.Exception (bracket, finally)
> import Network (PortID(..), accept, sClose, listenOn)
> import System.IO (Handle, hClose, hFlush, hGetLine, hPutStrLn)



There are a whole different set of problems to solve when clients can talk to each other. The original threaded server simply used the bracket function to take care of all of the looping. Chat requires each thread to have access to every other thread.



> threaded talk =
> do { state <- atomically (newTVar [])
> ; bracket (listenOn $ PortNumber 8000)
> (sClose)
> (loop state) }
> where loop s sock = do { c <- accept sock
> ; threadCreator s talk c
> ; loop s sock}




Three changes, First the creation of state. This state var is passed down to each client thread. This is the key to interprocess communication. Every thread can read and write this variable, so they have a way to talk to each other. The second big change is simplifying the where clause. loop is a little bigger than I'm comfortable with, but i think the code is clear. handle was pulled out, and promoted to a top-level function, threadCreator. Third, the protocol was removed from the server. Threaded and threadCreator only concern themeelves with socket level interaction. Protocol for using the sockets is dealt with by whatever talk function is passed in.




> threadCreator state talk (h, n, p) =
> do { manipState state (\x -> h:x)
> ; forkIO $ finally (talk h state)
> (do { manipState state
> (\x -> filter (h /=) x)
> ; hClose h})}



ThreadCreator has a lot more work to do. When threaded accepts a connection threadCreator puts that connection into the global state. Conceptually, when we get a connection, cons it on to the list of all the connections. Then, when we're done talking to a client threadCreator removes the closed handle from the list of connections. manipState does the work of adding and removing the connections from the list.

ThreadCreator got some complexity from exception handling. before, a thread could lose it's socket and just die. Now we have special resources that must be cleaned up. ThreadCreator has one wonderful property that i feel more than makes up for it's clumsiness. ThreadCreator cleans up all of the resources it creates with no special handling. Everything you ever need to know about manipulating the global state is right there. All of the side effects are locked down to six lines of code. Anyone who's some time chasing malloc/free or new/delete pairs will appreciate how important this really is.



> manipState state op =
> atomically $ do { ls <- readTVar state
> ; writeTVar state $ op ls}




ManipState is a simple helper. perhaps it should go in a where block of threadCreator, but I'm wary of extensive where blocks. The point is, given some state, read the state, operate on it then write the state back.




> tell h s = hPutStrLn h s >> hFlush h
> echo h s = tell h "echo!" >>
> loop
> where loop = hGetLine h >>= tell h >> loop




Here we have a simple echo talk protocol to verify everything compiles and appears to work. running threaded echo in ghci fires up the server. It appears to echo back whatever i type at it. It also appears well behaved on disconnect. The last step is to really chat, with threads talking to each other.

> tellAll h s = mapM (\x -> tell x s) h

> chat :: Handle -> TVar [Handle]->IO a
> chat h s = tell h "chat!" >>
> loop
> where loop = do { msg <- hGetLine h
> ; ls <- atomically(readTVar s)
> ; tellAll ls msg
> ; loop }

Try threaded chat from a ghci prompt, or main = threaded chat for ghc.

The chat client handler thread reads a message, then writes that message to every open handle. The one big problem with this approach is simultaneous messages. There's a race condition where to threads may write to the output handle at the same time. This could garble client 1's message "foo" and client 2's message "bar" resulting in something like "fboo" and "ar". More serious is the case where client 1 disconnects, just as client 2 is writing to the handle. client 2 will get an io exception and be disconnected.

These problems could be solved with more state, and more error checking. I'll address them in part 3. As a preview, haskell threads are so inexpensive i think a good solution would be one thread for reading every handle, and another thread for writing every handle. That approach needs more sophisticated shared state. The basic idea is each reader thread calls a dispatch function, much like talk. The dispater writes to one or more TChan channels. The writer threads take data from the tchan, and put it on the socket.

Jason

No comments: