i m working on a simple implementation of message passing interface (mpi). there are multiple machines and all the machines are connected to each other. i have used java sockets. i have used selector both in client machines and server machines (client refers that some machines connect to some other machines and once all the machines are connected they can pass messages to each other). but i find in client machines selectors get message only the first time, i mean only once. later they dont get any further message though several messages are sent to them from server machines. i dont find any other problem anywhere. servers get msg and send msg perfectlya. program runs properly but only in the mentioned case i get such critical problem. clients dont get problem in sending msg but in receiving msg. i cant provide the code. but i hope my intelligent and skilled helpers can understand.
I am developing a socket based application using the nio pacakge. I am a little new to all this and have learned a great deal from reading these posts. I have a few quick questions/assumptions I was hoping could be worked out:
1) Using Selectors: Never set the SelectionKey to both read and write. Rather, switch these back and forth using interestOps() as my application dictates. I will need to ack each incoming message. So my interestOps would go from read -> write -> read. I would read the message, ack it, and then go back to reading again. Is this the correct way of doing this?
2) Large reads: The size of inbound messages will be large and variable. In this case, is it preferred to read one "chunk" per iteration over the Selector? After each read, my code will then determine if I've finished reading a "message". My thinking is that it would be a bad idea to do all my reads in one select() cycle because this would potentially hold up other channels from processing.
Do these assumptions sound sane or am I completely off here?
before i attempt to answer your questions, i must warn you that if your client-server communication follows a complicated protocol then using nio becomes complicated.
I don't know if you're doing a university project but if you are my advice is make a prototype mutli-threaded server and then try making a nio server if you have the time. Our pitfall was spending way too much time on trying to make a nio server.
But enough rambling, now to answer your question.
1) Yes, you are writing about changing the interestOps to read and write when expected. If you were registering both read and write at the same time the server goes ape-**** because the selector would constantly be selecting the channels.
2) Yes, you need to keep track of what you have read to determine if its a complete message or a end of message. That is precisely what makes nio servers so complicated. You have to compromise between the size of the buffer you are writing to and the way to determine if a message has been completely received or not. You could have reserved endofmessage markers to determine when a message has been received in its entirety.
I'm not to sure what you mean here:
My thinking is that it would be a bad idea to do all my reads in one select() cycle >because this would potentially hold up other channels from processing.
But you should use a thread-pool to execute tasks submitted by the client.
I have exams for the moment but if you contact me on:
N.Pontikos at cs dot ucl.ac dot uk
then i will be very much obliged to show you my code and try and help out after the 26th of may.
Thanks for the speedy reply. I've done a bit of work with sockets using non nio methods and in this case I will probably continue that. I've been exploring using nio but after seeing some of the issues faced, and considering I won't need to support many clients, I will take your advice and not use it for now.
Regarding OP_WRITE, I think that I and pkwooster have the same opinion.
Always assume that you can write, and be free to write at once.
if you get an incomplete write, signal interest in OP_WRITE and keep buffering the output until OP_WRITE gets selected upon which you try to flush the buffer, if all goes well, remove interest in OP_WRITE and continue to assume that you are always able to write, util you get a non-full write, where you signal interest in OP_WRITE again, and so on and so on.
Regarding Problems with TIME_WAIT I only have one plausible solution, and that is SO_REUSEADDR and TCP_NODELAY.
I am struggling with the exact same symptom - works on Windows but hangs on the register in Linux. Can you possibly reach back and remember some details?
I am struggling with the exact same symptom - works
on Windows but hangs on the register in Linux. Can
you possibly reach back and remember some details?
Thanks
register and interestOps will block if they are done from another thread while the select is active. That's why I use a list of things that must be done in the select loop.
To: PkWooster
I was working my way through NIO and then saw this topic.
Modified my code a little and your writeReady theory seamed right.
Tried to implement but then i was not able to write at all!
Could you look at my code and help me out. Tried to find this bug
all day, but no achieving my goals.
Server.java
import java.io.IOException;
import java.net.*;
import java.nio.channels.*;
import java.util.*;
publicclass Server {
private Map<SocketChannel,ChannelIO> channels = new HashMap<SocketChannel,ChannelIO>();
private ServerSocketChannel server;
private SocketChannel socket;
private Selector selector;
privateint port = 2000;
public Server ( ) {
System.out.println("__constructor Server");
}
public Server ( int port ) {
System.out.println("__constructor Server (int port)");
this.port = port;
}
publicvoid init() throws IOException,UnknownHostException {
System.out.println("init() Server");
server = ServerSocketChannel.open();
server.configureBlocking(false);
InetAddress ia = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(ia,port);
server.socket().bind( isa );
}
publicvoid runServer() throws IOException {
init();
System.out.println("runServer() Server");
selector = Selector.open();
SelectionKey acceptKey = server.register(selector, SelectionKey.OP_ACCEPT);
while (acceptKey.selector().select()>0)
{
Set readKeys = selector.selectedKeys();
Iterator it = readKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
if (key.isAcceptable()) doAccept(key);
if (key.isReadable()) doRead(key);
if (key.isWritable()) doWrite(key);
it.remove();
}
}
}
publicvoid doRead(SelectionKey key) {
ChannelIO cio = (ChannelIO) key.attachment();
cio.doRead();
}
publicvoid doWrite(SelectionKey key) {
ChannelIO cio = (ChannelIO) key.attachment();
cio.doWrite();
}
publicvoid doAccept(SelectionKey key) {
System.out.println("Accepting...");
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
try {
socket = (SocketChannel) ssc.accept();
socket.configureBlocking(false);
socket.socket().setKeepAlive(true);
SelectionKey sk = socket.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
ChannelIO cio = new ChannelIO( sk, channels);
channels.put(socket,cio);
sk.attach(cio);
} catch (IOException e) {
e.printStackTrace();
}
}
publicstaticvoid main(String[] args) {
Server ChitChat = new Server();
try
{
ChitChat.runServer();
}
catch (IOException e)
{
e.printStackTrace();
System.exit(-1);
}
System.out.println("close() Server");
}
}
ChannelIO:java
import java.io.IOException;
import java.nio.*;
import java.nio.charset.*;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.*;
publicclass ChannelIO {
private LinkedList<String> sendQueue = new LinkedList<String>();
private ByteBuffer recvBuffer = null;
private ByteBuffer sendBuffer = null;
privateboolean writeReady = true;
private SelectionKey sk = null;
private StringBuffer recvString = new StringBuffer();
private CharsetEncoder encoder;
private CharsetDecoder decoder;
public SocketChannel sc;
public Map<SocketChannel,ChannelIO> channels;
public ChannelIO(SelectionKey sc, Map<SocketChannel,ChannelIO> users) {
this.sc=(SocketChannel) sc.channel();
this.channels=users;
this.sk = sc;
Charset charset = Charset.forName("ISO-8859-1");
decoder = charset.newDecoder();
encoder = charset.newEncoder();
recvBuffer = ByteBuffer.allocate(8196);
}
publicvoid doSay(String s) {
Iterator it = channels.values().iterator();
while (it.hasNext()) {
ChannelIO socket = (ChannelIO)it.next();
if (socket==this) {
socket.sendQueue.add("You say: "+s);
}
else {
socket.sendQueue.add("Someone say: " +s);
}
}
}
publicvoid doRead() {
if (sc.isOpen()) {
int len;
recvBuffer.clear();
try { len = sc.read(recvBuffer); }
catch( IOException e){e.printStackTrace(); len=-1;}
if (len > 0) {
recvBuffer.flip();
CharBuffer buf = null;
try { buf = decoder.decode(recvBuffer); }
catch(Exception ce){ce.printStackTrace(); len = -1;}
doSmth( buf );
}
if (len<0) close();
}
}
publicvoid doSmth( CharBuffer buf) {
int i=0;
int j=0;
recvString.append(buf);
int z = recvString.length();
while (i<z && i>=0) {
char c = recvString.charAt(i);
if (c=='\r' || c=='\n') {
i++;
if(c == '\r' && i < z && '\n' == recvString.charAt(i)) i++;
doSay(recvString.substring(j,i));
j=i+1;
}
elseif (c=='\b' && z>0) {
if (recvString.length()>2) {
recvString.deleteCharAt(i-1);
z--; i--;
}
recvString.deleteCharAt(recvString.indexOf("\b"));
z--; i--;
}
else i++;
}
if(j < z)recvString.delete(0,j);
else recvString = new StringBuffer();
}
publicvoid doWrite() {
// System.out.println("Siin");
// sk.interestOps(SelectionKey.OP_READ);
// writeReady = true;
if(sendBuffer != null) write(sendBuffer);
writeQueued();
}
publicvoid writeQueued() {
// while (writeReady && sendQueue.size()>0) {
while (sendQueue.size()>0) {
String msg = (String)sendQueue.remove(0);
try {
write(encoder.encode(CharBuffer.wrap(msg)));
} catch (Exception e) {
e.printStackTrace();
}
}
}
publicvoid write(ByteBuffer data) {
if (sc.isOpen()) {
int len=0;
if(data.hasRemaining())
{
try{len = sc.write(data);}
catch(IOException e){e.printStackTrace(); }
}
if(data.hasRemaining())
{
// writeReady = false;
// sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
sendBuffer = data;
}
else sendBuffer = null;
}
}
publicvoid close() {
System.out.println("Leaving..");
if(sc.isOpen())
{
try
{
sc.close();
sk.selector().wakeup();
sk.attach(null);
}
catch(IOException e){ e.printStackTrace(); }
}
}
}
Forgot to mention that i also achieve an error when client closes he`s window.
This error is related to SelectionKeys.
Sometimes, randomly it also crashes in doRead.
I tried to work smth. with \b but it may not work very well.
It would be great if someone help me fix these bugs.
If you have some spare time you(pkwooster) maybe would like to
update your examples with backspace code?
I don`t mind if you use my solution:)
To: PkWooster
I was working my way through NIO and then saw this
topic.
Modified my code a little and your writeReady theory
seamed right.
Tried to implement but then i was not able to write
at all!
Could you look at my code and help me out. Tried to
find this bug
all day, but no achieving my goals.
I took a quick look and the problem is the same OP_WRITE problem. You enable it in doAccept and never disable it. This causes a tight loop between your select and the write method. The code you have commented out looks closer to correct.
The reason why i commented it out is that it hangs when it`s uncommented.
Any ideas why it crashes when client leaves?
And still interested in OP_WRITE stuff.
Maybe you could run it in your machine? THen you coulds ee what`s the problem.
PS. I Copied your version of NIOServer to my laptop and when i tested it
got the same problem. It hangs and does not let write to user
I`m using windows, does that make any difference?
Sorry for spamming.
Wanted to say, that i got your code working, was my machines fault. But
if i uncomment those lines in m,y code, it wan`t write any data to client.
And still trying to fix that client closing and server crash problem.