module game_core.job_updater; import bubel.ecs.std; import bubel.ecs.vector; import bubel.ecs.atomic; import bubel.ecs.manager; import mmutils.thread_pool; version(LDC) { import ldc.attributes; } else { struct optStrategy { string strategy; } } version(Android) { alias pthread_key_t = uint; extern (C) int pthread_key_create(pthread_key_t *, void* function(void *)) @nogc nothrow; extern (C) int pthread_key_delete(pthread_key_t) @nogc nothrow; extern (C) void* pthread_getspecific(pthread_key_t) @nogc nothrow; extern (C) int pthread_setspecific(pthread_key_t, const void *) @nogc nothrow; } else version(WebAssembly) { alias pthread_key_t = uint; extern (C) int pthread_key_create(pthread_key_t *, void* function(void *)) @nogc nothrow; extern (C) int pthread_key_delete(pthread_key_t) @nogc nothrow; extern (C) void* pthread_getspecific(pthread_key_t) @nogc nothrow; extern (C) int pthread_setspecific(pthread_key_t, const void *) @nogc nothrow; extern (C) void emscripten_main_thread_process_queued_calls(); } struct ECSJobUpdater { this(uint threads) { onCreate(threads); } ~this() { //wait for end of jobs pool.waitThreads(); //dispose jobs array if(jobs)Mallocator.dispose(jobs); //free TLS data version(WebAssembly)pthread_key_delete(tls_key); else version(Android)pthread_key_delete(tls_key); } struct Group { ~this() nothrow { } JobsGroup group; //each group can have up to 128 jobs JobData[128] jobs; JobCaller[128] callers; uint count = 0; string name; //mmutils.ThreadPool uses system of dependency where dependencies are added for child groups. //Parent group has atomic counter and after completition it will add job groups dependant on it. void dependantOn(Group* dependency) { group.dependantOn(&dependency.group); } //add group to pool void start() { group.thPool.addGroupAsynchronous(&group); } //add jobs slice to group structure void build(ThreadPool* pool) { group.thPool = pool; group.jobs = jobs[0..count]; } //clear jobs void clear() { group = JobsGroup("name",null); count = 0; } //add single job to group void add(JobCaller caller) { callers[count] = caller; jobs[count] = JobData(&callers[count].callJob,name); count++; } } //initialize thread pool and data void onCreate(uint threads_count) { //create TLS for Android and WebAsssembly version(WebAssembly)pthread_key_create(&tls_key, null); else version(Android)pthread_key_create(&tls_key, null); pool.initialize(); thread_data = pool.registerExternalThread(); pool.setThreadsNum(threads_count); jobs = Mallocator.makeArray!Group(256); } //this function are providingn ThreadID to ECS. BubelECS is expecting ThreadID to be linear ID in range (0;ThreadsCount) uint getThreadID() @nogc nothrow { version(WebAssembly)return cast(int)pthread_getspecific(tls_key); else version(Android)return cast(int)pthread_getspecific(tls_key); else return thread_id; } //clear jobs data void begin() { call_jobs.clear(); foreach(ref job;jobs) { job.clear(); } last_job.clear(); } //execute jobs void call() { //if there is no work return if(last_job.group.getDependenciesWaitCount() == 0)return; if(call_jobs.length == 0)return; //set last job groupEndJobs[0] = JobData(&releaseMainThread, "Stop Threads", null, null); //add job to group last_job.group.jobs = groupEndJobs; //set thread pool pointer last_job.group.thPool = &pool; //last job should be called on main thread. It prevent some issues with death loops. last_job.group.executeOnThreadNum = 0; //start jobs without dependencies foreach(job;call_jobs) { job.start(); } //add main thread to pool. It will be released in last job. thread_data.threadStartFunc(); } //callback that will release main thread void releaseMainThread(ThreadData* th_data, JobData* data) { pool.releaseExternalThreads(); } static struct JobCaller { //ECS job EntityManager.Job* job; //pointer to parent ECSJobUpdater* updater; //job ID uint id; //called by external thread void callJob(ThreadData* th_data, JobData* data) { version(WebAssembly) { pthread_setspecific(tls_key, cast(void*)th_data.threadId); if(th_data.threadId == 0) { //this emscripten call is required to make multithreading working emscripten_main_thread_process_queued_calls(); job.execute(); emscripten_main_thread_process_queued_calls(); } else job.execute(); } else version(Android) { pthread_setspecific(tls_key, cast(void*)th_data.threadId); job.execute(); } else { //set thread id updater.thread_id = th_data.threadId; //execture job. It's the function from BubelECS job.execute(); } } } //this is callback passed to EntityManager. EntityManager will call this for every jobs group. Every system will generate one group. void dispatch(EntityManager.JobGroup group) { //check if group isn't empty if(group.jobs.length == 0) { return; } //add name for job. Used for traces. jobs[group.id].name = cast(string)group.caller.system.name; //add jobs to group foreach(ref job;group.jobs) { uint index = 0; if(job.callers.length)index = job.callers[0].system_id; JobCaller caller; caller.updater = &this; caller.job = &job; caller.id = index; jobs[group.id].add(caller); } //build group jobs[group.id].build(&pool); uint deps = cast(uint)group.dependencies.length; //add dependencies foreach(dep;group.dependencies) { if(jobs[dep.id].count && dep.caller.system.willExecute && dep.caller.system.enabled)jobs[group.id].dependantOn(&jobs[dep.id]); else deps--; } //set as job without dependencies if it hasn't any if(deps == 0) { call_jobs.add(&jobs[group.id]); } //last job is dependant on all jobs so it will be called after everything will be finished last_job.dependantOn(&jobs[group.id]); } //Webassembly version works properly only when there is no thread local data (static variables). //Because of that I'm using pthread tls instead of D. TLS is used only for storing ThreadID version(WebAssembly) { __gshared pthread_key_t tls_key; } else version(Android) { __gshared pthread_key_t tls_key; } else static uint thread_id = 0; //thread pool ThreadPool pool; //thread data used for main thread ThreadData* thread_data; //array of jobs Group[] jobs; //list of jobs which should be called on frame start as they have no dependencies Vector!(Group*) call_jobs; //last job group is used for releasing main thread from pool Group last_job; //last_job group has one job JobData[1] groupEndJobs; }