Android开发进阶之NIO非阻塞包(八)

在整个DDMS中体现Android NIO主要框架的要数MonitorThread.java这个文件了,有关PC和Android手机同步以及NIO非阻塞编程的精髓可以在下面的文件中充分体现出来。

final class MonitorThread extends Thread {

private static final int CLIENT_READY = 2;

private static final int CLIENT_DISCONNECTED = 3;

private volatile boolean mQuit = false;

private ArrayList<Client> mClientList; //用一个数组保存客户端信息

private Selector mSelector;

private HashMap<Integer, ChunkHandler> mHandlerMap; //这里Android123提示大家,由于在多线程中concurrentHashMap效率比HashMap更安全高效,推荐使用并发库的这个替代版本。

private ServerSocketChannel mDebugSelectedChan; //一个用于调试的服务器通道

private int mNewDebugSelectedPort;

private int mDebugSelectedPort = -1;

private Client mSelectedClient = null;

private static MonitorThread mInstance;

private MonitorThread() {
    super("Monitor");
    mClientList = new ArrayList<Client>();
    mHandlerMap = new HashMap<Integer, ChunkHandler>();

    mNewDebugSelectedPort = DdmPreferences.getSelectedDebugPort();
}

static MonitorThread createInstance() {  //创建实例
    return mInstance = new MonitorThread();
}

static MonitorThread getInstance() { //获取实例
    return mInstance;
}

synchronized void setDebugSelectedPort(int port) throws IllegalStateException { //设置调试端口号
    if (mInstance == null) {
        return;
    }

    if (AndroidDebugBridge.getClientSupport() == false) {
        return;
    }

    if (mDebugSelectedChan != null) {
        Log.d("ddms", "Changing debug-selected port to " + port);
        mNewDebugSelectedPort = port;
        wakeup(); //这里用来唤醒所有的Selector
    } else {
        // we set mNewDebugSelectedPort instead of mDebugSelectedPort so that it's automatically
        mNewDebugSelectedPort = port;
    }
}

synchronized void setSelectedClient(Client selectedClient) {
    if (mInstance == null) {
        return;
    }

    if (mSelectedClient != selectedClient) {
        Client oldClient = mSelectedClient;
        mSelectedClient = selectedClient;

        if (oldClient != null) {
            oldClient.update(Client.CHANGE_PORT);
        }

        if (mSelectedClient != null) {
            mSelectedClient.update(Client.CHANGE_PORT);
        }
    }
}

Client getSelectedClient() {
    return mSelectedClient;
}

boolean getRetryOnBadHandshake() {
    return true; // TODO? make configurable
}

Client[] getClients() {
    synchronized (mClientList) {
        return mClientList.toArray(new Client[0]);
    }
}

synchronized void registerChunkHandler(int type, ChunkHandler handler) {
    if (mInstance == null) {
        return;
    }

    synchronized (mHandlerMap) {
        if (mHandlerMap.get(type) == null) {
            mHandlerMap.put(type, handler);
        }
    }
}

@Override
public void run() { //本类的主要线程
    Log.d("ddms", "Monitor is up");

    try {
        mSelector = Selector.open();
    } catch (IOException ioe) {
        Log.logAndDisplay(LogLevel.ERROR, "ddms",
                "Failed to initialize Monitor Thread: " + ioe.getMessage());
        return;
    }

    while (!mQuit) {

        try {
            synchronized (mClientList) {
            }

            try {
                if (AndroidDebugBridge.getClientSupport()) {
                    if ((mDebugSelectedChan == null ||
                            mNewDebugSelectedPort != mDebugSelectedPort) &&
                            mNewDebugSelectedPort != -1) {
                        if (reopenDebugSelectedPort()) {
                            mDebugSelectedPort = mNewDebugSelectedPort;
                        }
                    }
                }
            } catch (IOException ioe) {
                Log.e("ddms",
                        "Failed to reopen debug port for Selected Client to: " + mNewDebugSelectedPort);
                Log.e("ddms", ioe);
                mNewDebugSelectedPort = mDebugSelectedPort; // no retry
            }

            int count;
            try {
                count = mSelector.select();
            } catch (IOException ioe) {
                ioe.printStackTrace();
                continue;
            } catch (CancelledKeyException cke) {
                continue;
            }

            if (count == 0) {
                continue;
            } //这里代码写的不是很好,Android开发网提示大家因为这个NIO是DDMS工作在PC端的还不明显,这样轮训的在一个while中,效率不是很高,CPU很容易占用率很高。

            Set<SelectionKey> keys = mSelector.selectedKeys();
            Iterator<SelectionKey> iter = keys.iterator(); //使用迭代器获取这个选择键

            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove(); 

                try {
                    if (key.attachment() instanceof Client) { //判断收到的key的附件是否是Client的实例
                        processClientActivity(key);
                    }
                    else if (key.attachment() instanceof Debugger) { //如果是Debug实例
                        processDebuggerActivity(key);
                    }
                    else if (key.attachment() instanceof MonitorThread) {
                        processDebugSelectedActivity(key);
                    }
                    else {
                        Log.e("ddms", "unknown activity key");
                    }
                } catch (Exception e) {
                    Log.e("ddms", "Exception during activity from Selector.");
                    Log.e("ddms", e);
                }
            }
        } catch (Exception e) {
            Log.e("ddms", "Exception MonitorThread.run()");
            Log.e("ddms", e);
        }
    }
}

int getDebugSelectedPort() {
    return mDebugSelectedPort;
}

private void processClientActivity(SelectionKey key) {
    Client client = (Client)key.attachment();

    try {
        if (key.isReadable() == false || key.isValid() == false) {
            Log.d("ddms", "Invalid key from " + client + ". Dropping client.");
            dropClient(client, true /* notify */);
            return;
        }

        client.read();

        JdwpPacket packet = client.getJdwpPacket();
        while (packet != null) {
            if (packet.isDdmPacket()) {
                // unsolicited DDM request - hand it off
                assert !packet.isReply();
                callHandler(client, packet, null);
                packet.consume();
            } else if (packet.isReply()
                    && client.isResponseToUs(packet.getId()) != null) {
                // reply to earlier DDM request
                ChunkHandler handler = client
                        .isResponseToUs(packet.getId());
                if (packet.isError())
                    client.packetFailed(packet);
                else if (packet.isEmpty())
                    Log.d("ddms", "Got empty reply for 0x"
                            + Integer.toHexString(packet.getId())
                            + " from " + client);
                else
                    callHandler(client, packet, handler);
                packet.consume();
                client.removeRequestId(packet.getId());
            } else {
                Log.v("ddms", "Forwarding client "
                        + (packet.isReply() ? "reply" : "event") + " 0x"
                        + Integer.toHexString(packet.getId()) + " to "
                        + client.getDebugger());
                client.forwardPacketToDebugger(packet);
            }

            packet = client.getJdwpPacket();
        }
    } catch (CancelledKeyException e) { //注意正确处理这个异常
        dropClient(client, true /* notify */);
    } catch (IOException ex) {
        dropClient(client, true /* notify */);
    } catch (Exception ex) {
        Log.e("ddms", ex);

        dropClient(client, true /* notify */);

        if (ex instanceof BufferOverflowException) { //可能存在缓冲区异常
            Log.w("ddms",
                    "Client data packet exceeded maximum buffer size "
                            + client);
        } else {
            // don't know what this is, display it
            Log.e("ddms", ex);
        }
    }
}

private void callHandler(Client client, JdwpPacket packet,
        ChunkHandler handler) {

    // on first DDM packet received, broadcast a "ready" message
    if (!client.ddmSeen())
        broadcast(CLIENT_READY, client);

    ByteBuffer buf = packet.getPayload();
    int type, length;
    boolean reply = true;

    type = buf.getInt();
    length = buf.getInt();

    if (handler == null) {
        // not a reply, figure out who wants it
        synchronized (mHandlerMap) {
            handler = mHandlerMap.get(type);
            reply = false;
        }
    }

    if (handler == null) {
        Log.w("ddms", "Received unsupported chunk type "
                + ChunkHandler.name(type) + " (len=" + length + ")");
    } else {
        Log.d("ddms", "Calling handler for " + ChunkHandler.name(type)
                + " [" + handler + "] (len=" + length + ")");
        ByteBuffer ibuf = buf.slice();
        ByteBuffer roBuf = ibuf.asReadOnlyBuffer(); // enforce R/O
        roBuf.order(ChunkHandler.CHUNK_ORDER);
        synchronized (mClientList) {
            handler.handleChunk(client, type, roBuf, reply, packet.getId());
        }
    }
}

synchronized void dropClient(Client client, boolean notify) {
    if (mInstance == null) {
        return;
    }

    synchronized (mClientList) {
        if (mClientList.remove(client) == false) {
            return;
        }
    }
    client.close(notify);
    broadcast(CLIENT_DISCONNECTED, client);

    /*
     * [url]http://forum.java.sun.com/thread.jspa?threadID=726715&start=0[/url]
     * [url]http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5073504[/url]
     */
    wakeup();
}

/*
 * Process activity from one of the debugger sockets. This could be a new
 * connection or a data packet.
 */
private void processDebuggerActivity(SelectionKey key) {
    Debugger dbg = (Debugger)key.attachment();

    try {
        if (key.isAcceptable()) { //处理Server响应这个事件
            try {
                acceptNewDebugger(dbg, null); 
            } catch (IOException ioe) {
                Log.w("ddms", "debugger accept() failed");
                ioe.printStackTrace();
            }
        } else if (key.isReadable()) { //如果是收到的数据,则可读取
            processDebuggerData(key);
        } else {
            Log.d("ddm-debugger", "key in unknown state");
        }
    } catch (CancelledKeyException cke) { //记住,NIO处理这个异常,很多入门的开发者很容易忘记
        // key has been cancelled we can ignore that.
    }
}

 private void acceptNewDebugger(Debugger dbg, ServerSocketChannel acceptChan) //这里用到了阻塞方式
        throws IOException {

    synchronized (mClientList) {
        SocketChannel chan;

        if (acceptChan == null)
            chan = dbg.accept(); 
        else
            chan = dbg.accept(acceptChan);

        if (chan != null) {
            chan.socket().setTcpNoDelay(true);

            wakeup();

            try {
                chan.register(mSelector, SelectionKey.OP_READ, dbg);
            } catch (IOException ioe) {
                // failed, drop the connection
                dbg.closeData();
                throw ioe;
            } catch (RuntimeException re) {
                // failed, drop the connection
                dbg.closeData();
                throw re;
            }
        } else {
            Log.w("ddms", "ignoring duplicate debugger");
        }
    }
}

private void processDebuggerData(SelectionKey key) {
    Debugger dbg = (Debugger)key.attachment();

    try {
        dbg.read();

        JdwpPacket packet = dbg.getJdwpPacket();
        while (packet != null) {
            Log.v("ddms", "Forwarding dbg req 0x"
                    + Integer.toHexString(packet.getId()) + " to "
                    + dbg.getClient());

            dbg.forwardPacketToClient(packet);

            packet = dbg.getJdwpPacket();
        }
    } catch (IOException ioe) {
        Log.d("ddms", "Closing connection to debugger " + dbg);
        dbg.closeData();
        Client client = dbg.getClient();
        if (client.isDdmAware()) {
               Log.d("ddms", " (recycling client connection as well)");

                client.getDeviceImpl().getMonitor().addClientToDropAndReopen(client,
                    IDebugPortProvider.NO_STATIC_PORT);
        } else {
            Log.d("ddms", " (recycling client connection as well)");
            // we should drop the client, but also attempt to reopen it.
            // This is done by the DeviceMonitor.
            client.getDeviceImpl().getMonitor().addClientToDropAndReopen(client,
                    IDebugPortProvider.NO_STATIC_PORT);
        }
    }
}

private void wakeup() {
    mSelector.wakeup();
}

synchronized void quit() {
    mQuit = true;
    wakeup();
    Log.d("ddms", "Waiting for Monitor thread");
    try {
        this.join();
        // since we're quitting, lets drop all the client and disconnect
        // the DebugSelectedPort
        synchronized (mClientList) {
            for (Client c : mClientList) {
                c.close(false /* notify */);
                broadcast(CLIENT_DISCONNECTED, c);
            }
            mClientList.clear();
        }

        if (mDebugSelectedChan != null) {
            mDebugSelectedChan.close();
            mDebugSelectedChan.socket().close();
            mDebugSelectedChan = null;
        }
        mSelector.close();
    } catch (InterruptedException ie) {
        ie.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    mInstance = null;
}

synchronized void addClient(Client client) {
    if (mInstance == null) {
        return;
    }

    Log.d("ddms", "Adding new client " + client);

    synchronized (mClientList) {
        mClientList.add(client);

        try {
            wakeup();

            client.register(mSelector);

            Debugger dbg = client.getDebugger();
            if (dbg != null) {
                dbg.registerListener(mSelector);
            }
        } catch (IOException ioe) {
            // not really expecting this to happen
            ioe.printStackTrace();
        }
    }
}

/*
 * Broadcast an event to all message handlers.
 */
private void broadcast(int event, Client client) {
    Log.d("ddms", "broadcast " + event + ": " + client);

    /*
     * The handler objects appear once in mHandlerMap for each message they
     * handle. We want to notify them once each, so we convert the HashMap
     * to a HashSet before we iterate.
     */
    HashSet<ChunkHandler> set;
    synchronized (mHandlerMap) {
        Collection<ChunkHandler> values = mHandlerMap.values();
        set = new HashSet<ChunkHandler>(values);
    }

    Iterator<ChunkHandler> iter = set.iterator();
    while (iter.hasNext()) {
        ChunkHandler handler = iter.next();
        switch (event) {
            case CLIENT_READY:
                try {
                    handler.clientReady(client);
                } catch (IOException ioe) {
                    // Something failed with the client. It should
                    // fall out of the list the next time we try to
                    // do something with it, so we discard the
                    // exception here and assume cleanup will happen
                    // later. May need to propagate farther. The
                    // trouble is that not all values for "event" may
                    // actually throw an exception.
                    Log.w("ddms",
                            "Got exception while broadcasting 'ready'");
                    return;
                }
                break;
            case CLIENT_DISCONNECTED:
                handler.clientDisconnected(client);
                break;
            default:
                throw new UnsupportedOperationException();
        }
    }

}

/**
 * Opens (or reopens) the "debug selected" port and listen for connections.
 * @return true if the port was opened successfully.
 * @throws IOException
 */
private boolean reopenDebugSelectedPort() throws IOException {

    Log.d("ddms", "reopen debug-selected port: " + mNewDebugSelectedPort);
    if (mDebugSelectedChan != null) {
        mDebugSelectedChan.close();
    }

    mDebugSelectedChan = ServerSocketChannel.open();
    mDebugSelectedChan.configureBlocking(false); // required for Selector

    InetSocketAddress addr = new InetSocketAddress(
            InetAddress.getByName("localhost"), //$NON-NLS-1$
            mNewDebugSelectedPort);
    mDebugSelectedChan.socket().setReuseAddress(true); // enable SO_REUSEADDR

    try {
        mDebugSelectedChan.socket().bind(addr);
        if (mSelectedClient != null) {
            mSelectedClient.update(Client.CHANGE_PORT);
        }

        mDebugSelectedChan.register(mSelector, SelectionKey.OP_ACCEPT, this);

        return true;
    } catch (java.net.BindException e) {
        displayDebugSelectedBindError(mNewDebugSelectedPort);

        // do not attempt to reopen it.
        mDebugSelectedChan = null;
        mNewDebugSelectedPort = -1;

        return false;
    }
}

/*
 * We have some activity on the "debug selected" port. Handle it.
 */
private void processDebugSelectedActivity(SelectionKey key) {
    assert key.isAcceptable();

    ServerSocketChannel acceptChan = (ServerSocketChannel)key.channel();

    /*
     * Find the debugger associated with the currently-selected client.
     */
    if (mSelectedClient != null) {
        Debugger dbg = mSelectedClient.getDebugger();

        if (dbg != null) {
            Log.d("ddms", "Accepting connection on 'debug selected' port");
            try {
                acceptNewDebugger(dbg, acceptChan);
            } catch (IOException ioe) {
                // client should be gone, keep going
            }

            return;
        }
    }

    Log.w("ddms",
            "Connection on 'debug selected' port, but none selected");
    try {
        SocketChannel chan = acceptChan.accept();
        chan.close();
    } catch (IOException ioe) {
        // not expected; client should be gone, keep going
    } catch (NotYetBoundException e) {
        displayDebugSelectedBindError(mDebugSelectedPort);
    }
}

private void displayDebugSelectedBindError(int port) {
    String message = String.format(
            "Could not open Selected VM debug port (%1$d). Make sure you do not have another instance of DDMS or of the eclipse plugin running. If it's being used by something else, choose a new port number in the preferences.",
            port);

    Log.logAndDisplay(LogLevel.ERROR, "ddms", message);
}

}

从上面来看Android的开源代码有关PC上的写的不是很好,很多实现的地方都是用了严重的缝缝补补方式解决,有些习惯不是很到位,有关本NIO例子由于涉及的项目对象多,理解需要网友深入分析DDMS源码中的每个对象。细节写的不是很理想,Android123推荐大家,画出UML后再分析更清晰。