4 * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
5 * Copyright (C) 2003-2016 Cinelerra CV contributors
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 #include "bcsignals.h"
27 #include "condition.h"
30 #include "filesystem.h"
35 #include "packagedispatcher.h"
36 #include "preferences.h"
38 #include "renderfarm.h"
39 #include "renderfarmclient.h"
41 #include "transportque.h"
44 #include <arpa/inet.h>
48 #include <netinet/in.h>
52 #include <sys/socket.h>
53 #include <sys/types.h>
59 #define AF_FILE AF_LOCAL
63 RenderFarmServer::RenderFarmServer(
65 PackageDispatcher *packages,
66 Preferences *preferences,
69 int64_t *total_return,
70 Mutex *total_return_lock,
75 this->mwindow = mwindow;
76 this->packages = packages;
77 this->preferences = preferences;
78 this->use_local_rate = use_local_rate;
79 this->result_return = result_return;
80 this->total_return = total_return;
81 this->total_return_lock = total_return_lock;
82 this->default_asset = default_asset;
84 this->brender = brender;
85 client_lock = new Mutex("RenderFarmServer::client_lock");
88 RenderFarmServer::~RenderFarmServer()
90 clients.remove_all_objects();
94 // Open connections to clients.
95 int RenderFarmServer::start_clients()
99 for( int i=0; i<preferences->get_enabled_nodes() && !result; ++i ) {
100 client_lock->lock("RenderFarmServer::start_clients");
101 RenderFarmServerThread *client = new RenderFarmServerThread(this, i);
102 clients.append(client);
104 result = client->start_loop();
105 client_lock->unlock();
111 // The render farm must wait for all the clients to finish.
112 int RenderFarmServer::wait_clients()
114 //printf("RenderFarmServer::wait_clients 1\n");
115 clients.remove_all_objects();
116 //printf("RenderFarmServer::wait_clients 2\n");
120 int RenderFarmServer::active_clients()
123 for( int i=0; i<clients.size(); ++i )
124 if( clients[i]->running() ) ++n;
128 // Waits for requests from every client.
129 // Joins when the client is finished.
130 RenderFarmServerThread::RenderFarmServerThread(RenderFarmServer *server,
134 this->server = server;
135 this->number = number;
137 frames_per_second = 0;
141 Thread::set_synchronous(1);
146 RenderFarmServerThread::~RenderFarmServerThread()
149 if( socket_fd >= 0 ) close(socket_fd);
156 int RenderFarmServerThread::open_client(const char *hostname, int port)
161 // Open file for master node
162 if( hostname[0] == '/' ) {
163 if( (socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0 ) {
164 perror(_("RenderFarmServerThread::open_client: socket\n"));
168 struct sockaddr_un addr;
169 addr.sun_family = AF_FILE;
170 strcpy(addr.sun_path, hostname);
171 int size = (offsetof(struct sockaddr_un, sun_path) +
172 strlen(hostname) + 1);
174 // The master node is always created by BRender. Keep trying for 30 seconds.
176 #define ATTEMPT_DELAY 100000
182 if( connect(socket_fd, (struct sockaddr*)&addr, size) < 0 ) {
184 if( attempt > 30000000 / ATTEMPT_DELAY ) {
185 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
191 usleep(ATTEMPT_DELAY);
195 }while(!result && !done);
201 if( (socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0 ) {
202 perror(_("RenderFarmServerThread::start_loop: socket"));
207 struct sockaddr_in addr;
208 struct hostent *hostinfo;
209 addr.sin_family = AF_INET;
210 addr.sin_port = htons(port);
211 hostinfo = gethostbyname(hostname);
212 if( hostinfo == NULL ) {
213 fprintf(stderr, _("RenderFarmServerThread::open_client: unknown host %s.\n"),
218 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;
220 if( connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0 ) {
221 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
230 if( result ) socket_fd = -1;
235 int RenderFarmServerThread::start_loop()
238 socket_fd = open_client(server->preferences->get_node_hostname(number),
239 server->preferences->get_node_port(number));
240 if( socket_fd < 0 ) result = 1;
243 int watchdog_timeout = server->preferences->renderfarm_watchdog_timeout;
244 if( watchdog_timeout > 0 ) {
245 watchdog = new RenderFarmWatchdog(watchdog_timeout, this, 0);
250 if( !result ) Thread::start();
265 int64_t RenderFarmServerThread::read_int64(int *error)
268 if( !error ) error = &temp;
270 unsigned char data[sizeof(int64_t)];
271 *error = (read_socket((char*)data, sizeof(int64_t)) !=
274 // Make it return 1 if error so it can be used to read a result code from the
278 result = (((int64_t)data[0]) << 56) |
279 (((uint64_t)data[1]) << 48) |
280 (((uint64_t)data[2]) << 40) |
281 (((uint64_t)data[3]) << 32) |
282 (((uint64_t)data[4]) << 24) |
283 (((uint64_t)data[5]) << 16) |
284 (((uint64_t)data[6]) << 8) |
290 int RenderFarmServerThread::write_int64(int64_t value)
292 unsigned char data[sizeof(int64_t)];
293 data[0] = (value >> 56) & 0xff;
294 data[1] = (value >> 48) & 0xff;
295 data[2] = (value >> 40) & 0xff;
296 data[3] = (value >> 32) & 0xff;
297 data[4] = (value >> 24) & 0xff;
298 data[5] = (value >> 16) & 0xff;
299 data[6] = (value >> 8) & 0xff;
300 data[7] = value & 0xff;
301 return (write_socket((char*)data, sizeof(int64_t)) !=
307 int RenderFarmServerThread::read_socket(char *data, int len)
311 //printf("RenderFarmServerThread::read_socket 1\n");
313 watchdog->begin_request();
314 while(len > 0 && bytes_read >= 0)
317 bytes_read = read(socket_fd, data + offset, len);
319 if( bytes_read > 0 ) {
321 offset += bytes_read;
328 watchdog->end_request();
329 //printf("RenderFarmServerThread::read_socket 10\n");
334 int RenderFarmServerThread::write_socket(char *data, int len)
336 //printf("RenderFarmServerThread::write_socket 1\n");
337 int result = write(socket_fd, data, len);
338 //printf("RenderFarmServerThread::write_socket 10\n");
343 void RenderFarmServerThread::reallocate_buffer(int size)
345 if( buffer && buffer_allocated < size ) {
350 if( !buffer && size ) {
351 buffer = new unsigned char[size];
352 buffer_allocated = size;
356 void RenderFarmServerThread::run()
359 unsigned char header[5];
365 buffer_allocated = 0;
366 // fs_server = new RenderFarmFSServer(this);
367 // fs_server->initialize();
370 // Send command to run package renderer.
371 write_int64(RENDERFARM_PACKAGES);
377 // Wait for requests.
378 // Requests consist of request ID's and accompanying buffers.
380 bytes_read = read_socket((char*)header, 5);
381 //printf("RenderFarmServerThread::run 1\n");
382 if( bytes_read != 5 ) {
387 int request_id = header[0];
388 int64_t request_size = (((u_int32_t)header[1]) << 24) |
389 (((u_int32_t)header[2]) << 16) |
390 (((u_int32_t)header[3]) << 8) |
391 (u_int32_t)header[4];
393 reallocate_buffer(request_size);
395 // Get accompanying buffer
396 bytes_read = read_socket((char*)buffer, request_size);
398 //printf("RenderFarmServerThread::run 2 %d %jd %d\n", request_id, request_size, bytes_read);
399 if( bytes_read != request_size ) {
403 //printf("RenderFarmServerThread::run 3\n");
405 switch( request_id ) {
406 case RENDERFARM_PREFERENCES:
410 case RENDERFARM_ASSET:
418 case RENDERFARM_PACKAGE:
419 send_package(buffer);
422 case RENDERFARM_PROGRESS:
423 set_progress(buffer);
426 case RENDERFARM_SET_RESULT:
430 case RENDERFARM_SET_VMAP:
431 set_video_map(buffer);
434 case RENDERFARM_GET_RESULT:
438 case RENDERFARM_DONE:
439 //printf("RenderFarmServerThread::run 10\n");
443 case RENDERFARM_KEEPALIVE:
447 // if( fs_server->handle_request(request_id, request_size, (unsigned char*)buffer) ) break;
448 printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
451 //printf("RenderFarmServerThread::run 10 %d %jd\n", request_id, request_size);
453 //printf("RenderFarmServerThread::run 20\n");
455 // Don't let watchdog kill the entire renderfarm when a client finishes normally.
456 delete watchdog; watchdog = 0;
460 void RenderFarmServerThread::write_string(char *string)
465 len = strlen(string) + 1;
466 datagram = new char[len + 4];
468 memcpy(datagram + i, string, len);
469 write_socket((char*)datagram, len + 4);
470 //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n",
471 // datagram[0], datagram[1], datagram[2], datagram[3]);
477 void RenderFarmServerThread::send_preferences()
482 server->preferences->save_defaults(&defaults);
483 defaults.save_string(string);
484 write_string(string);
488 void RenderFarmServerThread::send_asset()
493 // The asset must be sent in two segments.
494 // One segment is stored in the EDL and contains decoding information.
495 // One segment is stored in the asset and contains encoding information.
496 server->default_asset->save_defaults(&defaults, 0, 1, 1, 1, 1, 1);
497 defaults.save_string(string1);
499 server->default_asset->write(&file, 0, 0);
500 file.terminate_string();
502 write_string(string1);
503 write_string(file.string());
508 void RenderFarmServerThread::send_edl()
513 server->edl->save_xml(&file, 0);
514 file.terminate_string();
515 //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string);
517 write_string(file.string());
518 //printf("RenderFarmServerThread::send_edl 2\n");
522 void RenderFarmServerThread::send_package(unsigned char *buffer)
524 this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
525 (((u_int32_t)buffer[1]) << 16) |
526 (((u_int32_t)buffer[2]) << 8) |
527 ((u_int32_t)buffer[3])) /
530 //printf("RenderFarmServerThread::send_package 1 %f\n", frames_per_second);
531 RenderPackage *package =
532 server->packages->get_package(frames_per_second,
534 server->use_local_rate);
536 //printf("RenderFarmServerThread::send_package 2\n");
537 datagram = new char[BCTEXTLEN];
541 //printf("RenderFarmServerThread::send_package 1\n");
542 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
543 write_socket(datagram, 4);
548 //printf("RenderFarmServerThread::send_package 10\n");
550 strcpy(&datagram[i], package->path);
551 i += strlen(package->path);
554 STORE_INT32(package->audio_start);
555 STORE_INT32(package->audio_end);
556 STORE_INT32(package->video_start);
557 STORE_INT32(package->video_end);
558 int use_brender = (server->brender ? 1 : 0);
559 STORE_INT32(use_brender);
560 STORE_INT32(package->audio_do);
561 STORE_INT32(package->video_do);
565 STORE_INT32(len - 4);
567 write_socket(datagram, len);
574 void RenderFarmServerThread::set_progress(unsigned char *buffer)
576 server->total_return_lock->lock("RenderFarmServerThread::set_progress");
577 *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
578 (((u_int32_t)buffer[1]) << 16) |
579 (((u_int32_t)buffer[2]) << 8) |
580 ((u_int32_t)buffer[3]);
581 frames_per_second = (double)((((u_int32_t)buffer[4]) << 24) |
582 (((u_int32_t)buffer[5]) << 16) |
583 (((u_int32_t)buffer[6]) << 8) |
584 ((u_int32_t)buffer[7])) /
586 server->total_return_lock->unlock();
588 server->preferences->set_rate(frames_per_second, number);
590 // This locks the preferences
591 if( server->mwindow ) server->mwindow->preferences->copy_rates_from(
592 server->preferences);
595 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
597 if( server->brender ) {
598 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
599 (((u_int32_t)buffer[1]) << 16) |
600 (((u_int32_t)buffer[2]) << 8) |
601 ((u_int32_t)buffer[3]),
602 (int64_t)(((u_int32_t)buffer[4]) << 24) |
603 (((u_int32_t)buffer[5]) << 16) |
604 (((u_int32_t)buffer[6]) << 8) |
605 ((u_int32_t)buffer[7]));
606 char return_value[1];
608 write_socket(return_value, 1);
615 void RenderFarmServerThread::set_result(unsigned char *buffer)
617 //printf("RenderFarmServerThread::set_result %p\n", buffer);
618 if( !*server->result_return )
619 *server->result_return = buffer[0];
623 void RenderFarmServerThread::get_result()
625 unsigned char data[1];
626 data[0] = *server->result_return;
627 write_socket((char*)data, 1);
643 RenderFarmWatchdog::RenderFarmWatchdog(int timeout_secs,
644 RenderFarmServerThread *server,
645 RenderFarmClientThread *client)
648 this->timeout_usecs = timeout_secs * 1000000;
649 this->server = server;
650 this->client = client;
651 next_request = new Condition(0, "RenderFarmWatchdog::next_request", 0);
652 request_complete = new Condition(0, "RenderFarmWatchdog::request_complete", 0);
656 RenderFarmWatchdog::~RenderFarmWatchdog()
659 next_request->unlock();
660 request_complete->unlock();
663 delete request_complete;
666 void RenderFarmWatchdog::begin_request()
668 next_request->unlock();
671 void RenderFarmWatchdog::end_request()
673 request_complete->unlock();
676 void RenderFarmWatchdog::run()
679 next_request->lock("RenderFarmWatchdog::run");
680 int result = request_complete->timed_lock(timeout_usecs, "RenderFarmWatchdog::run");
681 //printf("RenderFarmWatchdog::run 1 %d\n", result);
685 printf("RenderFarmWatchdog::run 1 killing client pid %d\n", client->pid);
686 kill(client->pid, SIGKILL);
689 printf("RenderFarmWatchdog::run 1 killing server thread %p\n", server);
691 unsigned char buffer[4];
693 server->set_result(buffer);