Tomas Kalibera, Luke Tierney: Socket Connections Update



Starting up a PSOCK cluster is not fast. In R 3.6 on just a few years old laptop with 8 logical cores, running Windows, it takes about 1.7s to start a cluster with 8 nodes:

library(parallel); system.time(cl <- makePSOCKcluster(8))

A good design is to start a cluster only once during an R session and then pass it to computations that can take advantage of it. This is needed so that the end user always has full control over how many cores are used in total. Starting a cluster in package code out of direct control of the user often causes big slowdowns by overloading the machine, resulting in much worse performance than sequential execution.

The 1.7s thus may seen acceptable, but if we start a larger cluster on a server machine with many cores, one node per logical core, the startup times become prohibitively large. On a recent Fedora server with 64 logical cores, it takes about 14s. On an old Solaris server with 64 logical cores, it takes 211s!

In R-devel, we have extended the sockets API and re-designed the startup of a PSOCK cluster. The Windows laptop mentioned above now starts the cluster in 0.5s in R-devel. Timings for several other machines with more cores are

R 3.6 R-devel
Fedora server (64 cores) 14s 0.4s
Ubuntu server (40 cores) 6.6s 0.4s
Windows server (48 cores) 9.3s 0.5s
Solaris server (64 cores) 211s 7s
macOS desktop (12 cores) 4.2s 0.7s

The speedup is large when the number of cores is large, but the individual cores are slow.

Compatibility

The rest of this post describes in detail how these performace improvements were achieved. The socket layer improvements do not change documented behavior for the existing API, but change observable behavior (sometimes timeouts are enforced when they were not before, blocking = FALSE is respected on the server end of a connection). The usual way of testing R changes via comparing results of package checks does not help too much here as the CRAN policies limit the number of cores to 2 for the checks (to prevent overloading of the check machines). Users relying on parallelization/sockets communication are hence invited and encouraged to run any of their tests they have on large systems and report any new bugs.

Starting a PSOCK cluster in R 3.6

R 3.6 and earlier starts a cluster sequentially. For each node, the server issues a system() command to start the node, then waits via socketConnection(server = TRUE) for the node to connect, and then does the same for the second node, etc. Nearly all the time to start a cluster is spent in starting all the R sessions.

Even though simple on the server, it is not simple in the nodes: when a node is started, it tries to connect to the server via socketConnection(server = FALSE), but the connection may fail due to a race condition: the operating system may decide to run socketConnection in the node before socketConnection in the server, and then the connection will fail. Therefore, even in R 3.6 cluster setup, the nodes had to re-try in case of a failure, and did so with an exponential backoff.

There is no way to avoid this race condition, because socketConnection(server = TRUE) does all three of the server socket operations: bind(), listen(), and accept(). It always creates and binds a temporary server socket, makes it listening, waits for a connection via accept(), and then destroys the server socket. In the time intervals when there is no listening server socket on the given port on the server, the connection from the node will fail.

In practice, this has been working well, because on typical operating systems the server will soon be scheduled and the number of retries will be small. However, if we were starting the nodes in parallel, the number of retries would probably increase dramatically with the increasing number of nodes, which would damage performance.

Server socket connections

We have, instead, extended the R connections API so that one can work with server socket connections directly, re-using them for accepting multiple socket connections:

serverSocket(port)
     
socketAccept(socket, blocking = FALSE, open = "a+",
             encoding = getOption("encoding"),
             timeout = getOption("timeout"))

serverSocket creates a listening server socket connection, which is of a new class "servsockconn". socketAccept accepts an incoming connection to the given server socket. socketSelect can be used on a listening server socket to wait for when a connection is ready to be accepted. The server socket connection can be closed by close as usual.

Listen backlog

The operating system has a queue of partial and established incoming connections. Established connections are ready to be given to the application via accept() (socketAccept()). This queue has a limited length, which one can influence via an argument to listen(), but it is only a hint and one cannot programmatically find out how large the queue really is. The operating system may impose a limit and make that queue shorter. We have modified R-devel to use the maximum length supported on each system, but there is no guarantee it would be enough for all nodes. The cluster setup hence needs to be behave correctly when the queue is too short.

TCP uses a 3-way handshake when establishing a connection. In the normal successful case when the queue is not full, a node sends SYN packet, the server puts the connection into a queue and responds with SYN+ACK, the node then gives the connection to the application as established (via connect()/socketConnection(server=FALSE)) and sends ACK to the server. The server then flags the connection as established (some systems put it to a different queue) and gives it to the application (via accept()/socketAccept()).

When the queue is full, a number of things can happen. Linux has effectively two queues, one for already established connections and one for partially established ones. Only the size of the established queue is influenced by the backlog argument to listen(), and when that queue is full, Linux already decreases the rate of adding connections to the partially-established queue. It may simply drop a SYN packet and not put an incoming connection to the partially-established queue. The TCP layer on the node will retry a few times, re-sending the SYN, but eventually give up, send a RST (reset), and the connection will fail. It is hence necessary to keep the code for retries after failure in the nodes even with the new server socket API.

Moreover, it can still happen even on Linux that an ACK from the node is received on the server, but the established queue is full. Then the server may send a RST to the node and the node will fail, because it is already blocked waiting for commands from the server on a connection it believes is already established. A similar situation arises when the server drops the ACK packet, but keeps the connection in the partially-established queue. It may then after a timeout re-send SYN+ACK to the node, the node re-sends its ACK, and eventually the connection may really succeed on the server or the server may send a RST and remove it from the partially-established queue. More information on how Linux implements the backlog is available here.

An additional complication is that the server may drop the ACK from the node and remove the connection from the partially-established queue. Empirically we have seen this during a stress test when the connections from the client were old, and the number of occurrences increased with the age of the connections, so presumably this is after a timeout. Consequently, the node then does not receive a RST and will keep waiting for a command from the server indefinitely.

This situation is known as half-opened connection and can arise in various ways in TCP communication. It would be resolved by the TCP layer if the node started communicating over the connection, but in the PSOCK protocol, it is the server that starts communicating, so this issue needs to be handled specially.

Server-initiated handshake

We have thus changed the cluster setup procedure so that the server, as soon as it gets a connection from the node, sends an initial command to the node as a handshake. The node waits for such initial command during the setup phase and if it does not get the command in some time (half-opened connection closed on server) or if the waiting fails (it gets a RST from the server), the node re-tries establishing a connection to the server and waiting for a new handshake. This does not change the wire protocol: the handshake is just a regular command and the node runs it and sends a response.

Connection timeouts

The R 3.6 sockets API allows to define a timeout for a socket connection at creation time. The timeout then influences most operations on the socket (applies individually to low-level operations, but the R-level functions may wait somewhat longer in total). PSOCK clusters use connection timeouts of 30 days: if there is no command from the server (e.g. the server crashes or connection is lost) to the node within that time, the node exits on its own. For the handshake during cluster setup, we would need a much shorter timeout. We have hence extended the API to allow modifying a timeout of a socket connection:

socketTimeout(socket, timeout = -1)

This new function allows using a short timeout during the handshake and in socketConnection invocation on the node, which was previously effectively blocking (blocking on Linux due to a select() bug, timeout of 30 days on other systems). After the handshake, socketTimeout is called to increase the timeout again to 30 days.

The server initiated handshake in addition to helping with half-opened connections opened on the client only (via the timeout) also gets rid of any half-opened connection opened only on the server. The TCP layer will detect those and fail when the server starts communicating. We have observed these as well during our initial stress testing.

Parallel PSOCK cluster setup

In R-devel, the new parallel cluster setup is used by default for homogeneous clusters with all nodes running on localhost, when all nodes are started automatically. The original sequential startup code is still supported for the other cases. There is a new cluster option setup_strategy with values "parallel" and "sequential". "parallel" is the default and tells R to use parallel strategy on all cluster where supported.

R socket layer improvements

Several issues of the R socket layer implementation were fixed as part of this work.

The connection timeout with socketConnection(server = FALSE) on Linux is now enforced. Before, the call was accidentally blocking due to Linux-specific behavior of select().

socketConnection(server = FALSE) on Windows now returns immediately when the connection fails. Before, one had to wait for a timeout to expire due to Windows-specific behavior of select() when waiting for a connection.

socketConnection(server = FALSE) now detects when a connection is available right away without waiting (probably unlikely and only possible on a localhost) and returns it. Previously, R would wait anyway and attempt to connect again, possibly leaking the connection.

socketConnection(server = TRUE) (and socketAccept()) now enforce the connection timeout. Previously, they could block indefinitely due to a race condition when a connection seems available to be accepted by select(), but is re-set by the client by the time accept() is called. On some systems, accept() would then block. On other (we triggered this on Solaris), accept() would then fail; after the change, R will keep waiting for a good connection respecting the timeout.

Read operation from a socket is now robust against spurious readability of the socket (select() reports it as readable, but then e.g. due to an invalid checksum recv() would block). This problem may happen on Linux.

Write operation to a socket is now robust against spurious writeability of the socket. Previously, this case could lead to unpredictable behavior. However, this kind of select() bug has not been reported on any system to our knowledge.

The blocking = FALSE argument on a socket connection (socketConnection()) is now respected also on the server side of a socket. Previously, read/write operations on the server end of a socket were accidentally blocking even with blocking = FALSE.

The internal handling of status codes from socket operations on Windows has been updated for WinSock2.