source: branches/stable/CORE/arb_cs.cxx

Last change on this file was 17396, checked in by westram, 6 years ago
File size: 16.2 KB
Line 
1// ============================================================= //
2//                                                               //
3//   File      : arb_cs.cxx                                      //
4//   Purpose   : Basics for client/server communication          //
5//                                                               //
6//   Coded by Ralf Westram (coder@reallysoft.de) in March 2011   //
7//   Institute of Microbiology (Technical University Munich)     //
8//   http://www.arb-home.de/                                     //
9//                                                               //
10// ============================================================= //
11
12#include "arb_cs.h"
13#include "arb_msg.h"
14#include "arb_pattern.h"
15#include "arb_string.h"
16
17#include <smartptr.h>
18
19#include <unistd.h>
20#include <netdb.h>
21#include <sys/types.h>
22#include <sys/socket.h>
23#include <sys/un.h>
24#include <sys/stat.h>
25#include <netinet/tcp.h>
26
27// We need one of the below to prevent SIGPIPE on writes to
28// closed socket. For systems that have neither (Solaris),
29// we'd need to implement ignoring the signal in the write
30// loop (not done).
31#ifndef SO_NOSIGPIPE
32#ifndef MSG_NOSIGNAL
33#error Neither SO_NOSIGPIPE nor MSG_NOSIGNAL available!
34#endif
35#endif
36
37static void arb_gethostbyname(const char *name, struct hostent *& he, GB_ERROR& err) {
38    he = gethostbyname(name);
39    // Note: gethostbyname is marked obsolete.
40    // replacement getnameinfo seems less portable atm.
41
42    if (he) {
43        err = NULp;
44    }
45    else {
46        err = GBS_global_string("Cannot resolve hostname: '%s' (h_errno=%i='%s')",
47                                name, h_errno, hstrerror(h_errno));
48    }
49}
50
51const char *arb_gethostname() {
52    static SmartCharPtr hostname;
53    if (hostname.isNull()) {
54        char buffer[4096];
55        gethostname(buffer, 4095);
56        hostname = ARB_strdup(buffer);
57    }
58    return &*hostname;
59}
60
61size_t arb_socket_read(int socket, char* ptr, size_t size) {
62    size_t to_read = size;
63    while(to_read) {
64        ssize_t read_len = read(socket, ptr, to_read);
65        if (read_len <= 0) { // read failed
66            // FIXME: GB_export_error!
67            return 0;
68        }
69        ptr += read_len;
70        to_read -= read_len;
71    }
72    return size;
73}
74
75ssize_t arb_socket_write(int socket, const char* ptr, size_t size) {
76    size_t to_write = size;
77
78    while (to_write) {
79#ifdef MSG_NOSIGNAL
80        // Linux has MSG_NOSIGNAL, but not SO_NOSIGPIPE
81        // prevent SIGPIPE here
82        ssize_t write_len = send(socket, ptr, to_write, MSG_NOSIGNAL);
83        // Note: if valgrind warns about uninitialized bytes sent,
84        // one common reason are parameters passed as int (instead of long).
85        // Affected functions are aisc_put, aisc_nput and aisc_create.
86#else
87        ssize_t write_len = write(socket, ptr, to_write);
88#endif
89        if (write_len <= 0) { // write failed
90            if (errno == EPIPE) {
91                fputs("pipe broken\n", stderr);
92            }
93
94            // FIXME: GB_export_error!
95            return -1;
96        }
97        ptr += write_len;
98        to_write -= write_len;
99    }
100    return 0;
101}
102
103static GB_ERROR arb_open_unix_socket(char* name, bool do_connect, int *fd);
104static GB_ERROR arb_open_tcp_socket(char* name, bool do_connect, int *fd);
105
106/**
107 * Opens and prepares a socket
108 *
109 * If @param name begins with ":", the remainder is shell expanded and
110 * a unix socket is created. If @param contains no ":" it must be numeric,
111 * giving the TCPport number to open. If @param contains a ":" in the middle,
112 * the first part is considered the hostname and the latter part the port.
113 *
114 * @param  name          name of port   {[<host>:]<port>|:<filename>}
115 * @param  do_connect    connect if true (client), otherwise bind (server)
116 * @param  *fd           file descriptor of opened socket (out) or 0 (never returns <0!)
117 * @param  filename_out  filename of unix socket (out)
118 *                       must be NULp or allocated (will be freed)
119 *
120 * @result NULp if all went fine     -> *fd>0
121 *         "" if could not connect   -> *fd==0
122 *         otherwise error message   -> *fd==0
123 */
124GB_ERROR arb_open_socket(const char* name, bool do_connect, int *fd, char** filename_out) {
125    GB_ERROR error = NULp;
126    *fd            = 0;
127
128    if (!name || strlen(name) == 0) {
129        error = "Error opening socket: empty name";
130    }
131    else if (name[0] == ':') {
132        // expand variables in path
133        char *filename    = arb_shell_expand(name+1);
134        error             = GB_incur_error();
135        if (!error) error = arb_open_unix_socket(filename, do_connect, fd);
136
137        if (error) {
138            free(filename);
139        }
140        else {
141            reassign(*filename_out, filename);
142        }
143    } 
144    else {
145        char *socket_name = ARB_strdup(name);
146        error = arb_open_tcp_socket(socket_name, do_connect, fd);
147        free(socket_name);
148        freenull(*filename_out);
149    }
150
151    if (error) {
152        *fd = 0;
153    }
154    else {
155        arb_assert(*fd>0);
156    }
157
158    return error;
159}
160
161static GB_ERROR arb_open_unix_socket(char* filename, bool do_connect, int *fd) {
162    GB_ERROR error = NULp;
163
164    // create structure for connect/bind
165    sockaddr_un unix_socket;
166    unix_socket.sun_family = AF_UNIX;
167    if (strlen(filename)+1 > sizeof(unix_socket.sun_path)) {
168        error = GBS_global_string("Failed to create unix socket: "
169                                  "\"%s\" is longer than the allowed %zu characters",
170                                  filename, sizeof(unix_socket.sun_path));
171    }
172    else {
173        strncpy(unix_socket.sun_path, filename, sizeof(unix_socket.sun_path));
174
175        // create socket
176        *fd = socket(PF_UNIX, SOCK_STREAM, 0);
177        if (*fd < 0) {
178            error = GBS_global_string("Failed to create unix socket: %s", strerror(errno));
179        }
180        else {
181            // connect or bind socket
182            if (do_connect) {
183                if (connect(*fd, (sockaddr*)&unix_socket, sizeof(sockaddr_un))) {
184                    if (errno == ECONNREFUSED || errno == ENOENT) {
185                        error = "";
186                    }
187                    else {
188                        error = GBS_global_string("Failed to connect unix socket \"%s\": %s (%i)",
189                                                  filename, strerror(errno), errno);
190                    }
191                }
192            }
193            else {
194                struct stat stt;
195                if (!stat(filename, &stt)) {
196                    if (!S_ISSOCK(stt.st_mode)) {
197                        error = GBS_global_string("Failed to create unix socket at \"%s\": file exists"
198                                                  " and is not a socket", filename);
199                    }
200                    else if (unlink(filename)) {
201                        error = GBS_global_string("Failed to create unix socket at \"%s\": cannot remove"
202                                                  " existing socket", filename);
203                    }
204                }
205                if (!error && bind(*fd, (sockaddr*)&unix_socket, sizeof(sockaddr_un))) {
206                    error = GBS_global_string("Failed to bind unix socket \"%s\": %s",
207                                              filename, strerror(errno));
208                }
209            }
210
211#ifdef SO_NOSIGPIPE
212            if (!error) {
213                // OSX has SO_NOSIGPIPE but not MSG_NOSIGNAL
214                // prevent SIGPIPE here:
215                int one = 1;
216                if (setsockopt(*fd, SOL_SOCKET, SO_NOSIGPIPE, (const char *)&one, sizeof(one))){
217                    fprintf(stderr, "Warning: setsockopt(...NOSIGPIPE...) failed: %s", strerror(errno));
218                }
219            }
220#endif
221
222            if (error) {
223                close(*fd);
224                *fd = -1;
225            }
226        }
227    }
228
229    return error;
230}
231
232static GB_ERROR arb_open_tcp_socket(char* name, bool do_connect, int *fd) {
233    GB_ERROR error = NULp;
234
235    // create socket
236    *fd = socket(PF_INET, SOCK_STREAM, 0);
237    if (*fd < 0) {
238        error = GBS_global_string("Failed to create tcp socket: %s", strerror(errno));
239    }
240    else {
241        // create sockaddr struct
242        sockaddr_in tcp_socket;
243        tcp_socket.sin_family = AF_INET;
244
245        struct hostent *he;
246        // get port and host
247        char *p = strchr(name, ':');
248        if (!p) { // <port>
249            tcp_socket.sin_port = htons(atoi(name));
250            arb_gethostbyname(arb_gethostname(), he, error);
251        }
252        else { // <host>:<port>
253            tcp_socket.sin_port = htons(atoi(p+1));
254            p[0]='\0';
255            arb_gethostbyname(name, he, error);
256            p[0]=':';
257        }
258        if (tcp_socket.sin_port == 0) {
259            error = "Cannot open tcp socket on port 0. Is the port name malformed?";
260        }
261        if (!error) {
262            memcpy(&tcp_socket.sin_addr, he->h_addr_list[0], he->h_length);
263
264            int one = 1;
265            if (do_connect) {
266                if (connect(*fd, (sockaddr*)&tcp_socket, sizeof(tcp_socket))) {
267                    if (errno == ECONNREFUSED) {
268                        error = "";
269                    } else {
270                        error = GBS_global_string("Failed to connect TCP socket \"%s\": %s",
271                                                  name, strerror(errno));
272                    }
273                }
274            }
275            else { // no connect (bind)
276                if (setsockopt(*fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
277                    fprintf(stderr, "Warning: setsockopt(...REUSEADDR...) failed: %s", strerror(errno));
278                }
279                if (bind(*fd, (sockaddr*)&tcp_socket, sizeof(tcp_socket))) {
280                    error = GBS_global_string("Failed to bind TCP socket \"%s\": %s",
281                                              name, strerror(errno));
282                }
283            }
284
285            if (setsockopt(*fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
286                fprintf(stderr, "Warning: setsockopt(...TCP_NODELAY...) failed: %s", strerror(errno));
287            }
288        }
289
290        if (error) {
291            close(*fd);
292            *fd = -1;
293        }
294    }
295    return error;
296}
297
298////////// UNIT TESTS ///////////
299
300#ifdef UNIT_TESTS
301
302#ifndef TEST_UNIT_H
303# include <test_unit.h>
304#endif
305#include <sys/wait.h>
306#include <arb_sleep.h>
307
308static int echo_server(const char* portname) {
309    int mypid = fork();
310    if (mypid) return mypid;
311   
312    int fd;
313    char *filename = NULp;
314    GB_ERROR error = arb_open_socket(portname, false, &fd, &filename);
315    if (error) {
316        exit(1);
317    }
318
319    if (listen(fd, 1)) {
320        exit(2);
321    }
322
323    {
324        int cli_fd = accept(fd, NULp, NULp);
325        if (cli_fd < 0) {
326            exit(3);
327        }
328
329        char buf[500];
330        ssize_t n;
331        while(1) {
332            n = sizeof(buf);
333            n = arb_socket_read(cli_fd, buf, n);
334            if (n == 0) break;
335            n = arb_socket_write(cli_fd, buf, n);
336            if (n == -1) break;;
337            if (strcmp(buf, "exit") == 0) break;
338        }
339
340        close(cli_fd);
341    }
342
343    close(fd);
344    if (filename) {
345        unlink(filename);
346        free(filename);
347    }
348   
349    exit(0);
350}
351
352#if !defined(DARWIN)
353// TEST_DISABLED_OSX: this test may fail randomly (always timeouts under OSX)
354void TEST_open_socket() {
355    int fd;
356    char *filename = NULp;
357    int server_pid, server_status;
358
359    const int XTRABUF = 20; // silences buffer overflow warnings
360
361    // set up port names
362    char *unix_socket = arb_shell_expand(":$ARBHOME/UNIT_TESTER/sok/test.socket");
363    char tcp_socket[sizeof("65536")+XTRABUF], tcp_socket2[sizeof("localhost:65536")+XTRABUF];
364    {
365        // select port to use for tcp_sockets randomly to reduce probability
366        // of conflicts between parallel builds on same host
367        srand(time(NULp)+getpid());
368
369        const int RANGE    = 100;
370        const int PORT_MIN = 32039;
371        const int PORT_MAX = PORT_MIN+RANGE-1;
372
373        int order[RANGE]; // create random order of numbers in [0..RANGE-1]
374        {
375            int pos[RANGE];
376            for (int p = 0; p<RANGE; ++p) pos[p] = p;
377
378            for (int o = 0; o<RANGE; ++o) {
379                int limit = RANGE-o;
380                int take  = (rand()*double(limit))/RAND_MAX;
381                TEST_EXPECT(take>=0 && take<limit);
382                order[o]  = pos[take];
383                for (int t = take+1; t<limit; ++t) {
384                    pos[t-1] = pos[t];
385                }
386            }
387        }
388
389        int port = -1;
390        for (int o = 0; o<RANGE; o++) {
391            port = PORT_MIN+order[o];
392            TEST_EXPECT(port>=PORT_MIN && port<=PORT_MAX);
393
394            snprintf(tcp_socket, sizeof(tcp_socket), "%i", port);
395            const char *err = arb_open_socket(tcp_socket, true, &fd, &filename);
396            if (!err) { // connected
397                TEST_EXPECT_EQUAL(close(fd), 0);
398            }
399            else if (err[0] == '\0') { // could not connect
400                // found a free socket
401                break;
402            }
403            else { // other error
404                TEST_EXPECT_NULL(err);
405            }
406        }
407        TEST_REJECT(port == -1);
408        snprintf(tcp_socket2, sizeof(tcp_socket2), "localhost:%i", port);
409    }
410
411    // Test opening server sockets
412    TEST_EXPECT_NULL(arb_open_socket(tcp_socket, false, &fd, &filename));
413    TEST_EXPECT(fd>0);
414    TEST_EXPECT_NULL(filename);
415    TEST_EXPECT_EQUAL(close(fd), 0);
416
417    TEST_EXPECT_NULL(arb_open_socket(tcp_socket2, false, &fd, &filename));
418    TEST_EXPECT(fd>0);
419    TEST_EXPECT_NULL(filename);
420    TEST_EXPECT_EQUAL(close(fd), 0);
421
422    TEST_EXPECT_NULL(arb_open_socket(unix_socket, false, &fd, &filename));
423    TEST_EXPECT(fd>0);
424    TEST_REJECT_NULL(filename);
425    TEST_EXPECT_EQUAL(close(fd), 0);
426    TEST_EXPECT_EQUAL(unlink(filename), 0);
427    freenull(filename);
428
429    // Test connecting to existing tcp socket
430    server_pid = echo_server(tcp_socket);
431    TEST_REJECT_ZERO(server_pid);
432
433    {
434        ARB_timeout maxconnect(10, SEC);
435        GB_ERROR    res = "";
436
437        while (!maxconnect.passed()) {
438            res = arb_open_socket(tcp_socket, true, &fd, &filename);
439            if (!res || res[0]) break; // accept 'could not connect'
440            ARB_sleep(30, MS);
441        }
442        TEST_EXPECT_NULL(res); // randomly failed (in older revisions)
443    }
444    TEST_EXPECT(fd>0);
445    TEST_EXPECT_NULL(filename);
446    TEST_EXPECT_EQUAL(close(fd), 0);
447    TEST_EXPECT_EQUAL(server_pid, waitpid(server_pid, &server_status, 0));
448
449    // Test connecting to closed socket
450    TEST_EXPECT_EQUAL("", arb_open_socket(tcp_socket, true, &fd, &filename));
451    TEST_EXPECT_EQUAL("", arb_open_socket(unix_socket, true, &fd, &filename));
452   
453    // Test connecting to existing unix socket
454    server_pid = echo_server(unix_socket);
455    TEST_REJECT_ZERO(server_pid);
456    {
457        ARB_timeout maxconnect(10, SEC);
458        GB_ERROR    res = "";
459
460        while (!maxconnect.passed()) {
461            res = arb_open_socket(unix_socket, true, &fd, &filename);
462            if (!res || res[0]) break; // accept 'could not connect'
463            ARB_sleep(30, MS);
464        }
465        TEST_EXPECT_NULL(res); // randomly failed (in older revisions)
466    }
467    TEST_EXPECT(fd>0);
468
469    // Test read/write
470    char send_buf[500], recv_buf[500];
471    for (unsigned int i=0; i < sizeof(send_buf); i++) {
472        send_buf[i]=i % 64 + '0';
473    }
474    send_buf[sizeof(send_buf)-1]='\0';
475
476    TEST_EXPECT_NULL(arb_socket_write(fd, send_buf, sizeof(send_buf)));
477    TEST_EXPECT_EQUAL(sizeof(recv_buf), arb_socket_read(fd, recv_buf, sizeof(recv_buf)));
478    TEST_EXPECT_EQUAL(send_buf, recv_buf);
479    TEST_EXPECT_NULL(arb_socket_write(fd, send_buf, sizeof(send_buf)));
480    TEST_EXPECT_EQUAL(sizeof(recv_buf), arb_socket_read(fd, recv_buf, sizeof(recv_buf)));
481    TEST_EXPECT_EQUAL(send_buf, recv_buf);
482
483    // Test sigpipe (writing to closed socket)
484    // tell server to die:
485    strcpy(send_buf, "exit"); 
486    TEST_EXPECT_NULL(arb_socket_write(fd, send_buf, sizeof(send_buf)));
487    TEST_EXPECT_EQUAL(sizeof(recv_buf), arb_socket_read(fd, recv_buf, sizeof(recv_buf)));
488    // wait for server to die
489    TEST_EXPECT_EQUAL(server_pid, waitpid(server_pid, &server_status, 0));
490    // try writing to closed pipe
491    TEST_EXPECT_EQUAL(-1, arb_socket_write(fd, send_buf, sizeof(send_buf)));
492
493    TEST_EXPECT_EQUAL(close(fd), 0);
494    freenull(filename);
495   
496    free(unix_socket);
497}
498TEST_PUBLISH(TEST_open_socket);
499
500#endif
501
502#endif // UNIT_TESTS
503
504
Note: See TracBrowser for help on using the repository browser.