module game_core.job_updater; import bubel.ecs.std; import bubel.ecs.vector; import bubel.ecs.atomic; import ecs_utils.utils; //import core.time; import bubel.ecs.manager; import mmutils.thread_pool; version(LDC) { import ldc.attributes; } else { struct optStrategy { string strategy; } } //import supre.core.call_graph_generator; struct ECSJobUpdater { this(uint threads) { onCreate(threads); } ~this() { pool.waitThreads(); //pool.unregistExternalThread(thread_data); if(jobs)Mallocator.dispose(jobs); version(WebAssembly)pthread_key_delete(tls_key); } version(WebAssembly) { __gshared pthread_key_t tls_key; } else static uint thread_id = 0; ThreadPool pool; ThreadData* thread_data; int job_id = 0; int no_dep_count = 0; //static uint thread_id = 0; struct Group { ~this() nothrow { } JobsGroup group; JobData[1024] jobs; JobCaller[1024] callers; uint count = 0; string name; void dependantOn(Group* dependency) { group.dependantOn(&dependency.group); } void start() { group.thPool.addGroupAsynchronous(&group); } void build(ThreadPool* pool) { group.thPool = pool; group.jobs = jobs[0..count]; } void clear() { group = JobsGroup("name",null); count = 0; } void add(JobCaller caller) { callers[count] = caller; jobs[count] = JobData(&callers[count].callJob,name); count++; } } Group[] jobs; Vector!(Group*) call_jobs; Group last_job; JobData[1] groupEndJobs; //TrackData[32] trackers; void onCreate(uint threads_count) { version(WebAssembly)pthread_key_create(&tls_key, null); pool.initialize(); thread_data = pool.registerExternalThread(); pool.setThreadsNum(threads_count); jobs = Mallocator.makeArray!Group(256); } uint getThreadID() @nogc nothrow { version(WebAssembly)return cast(int)pthread_getspecific(tls_key); else return thread_id; } void begin() { job_id = 0; call_jobs.clear(); foreach(ref job;jobs) { job.clear(); } last_job.clear(); } void clearTracker() { //foreach(ref tracker;trackers)tracker.clear(); } @optStrategy("none") void nop() { int i; i++; } //@optStrategy("none") void call() { if(last_job.group.getDependenciesWaitCount() == 0)return; if(call_jobs.length == 0)return; //JobData[1] groupEndJobs; groupEndJobs[0] = JobData(&releaseMainThread, "Stop Threads", null, null); last_job.group.jobs = groupEndJobs; last_job.group.thPool = &pool; last_job.group.executeOnThreadNum = 0; foreach(job;call_jobs) { job.start(); } /*while(atomicLoad(ret) == 1)//!cas(&ret,0,1)) { nop(); version(WebAssembly)//emscripten_main_thread_process_queued_calls(); }//*/ thread_data.threadStartFunc(); } void releaseMainThread(ThreadData* th_data, JobData* data) { //atomicStore(ret,0); pool.releaseExternalThreads(); } static struct JobCaller { EntityManager.Job* job; ECSJobUpdater* updater; uint id; void callJob(ThreadData* th_data, JobData* data) { //uint job_id = updater.getThreadID(); //updater.trackers[job_id].begin(id); version(WebAssembly) { //updater.thread_id = th_data.threadId; pthread_setspecific(tls_key, cast(void*)th_data.threadId); if(th_data.threadId == 0) { emscripten_main_thread_process_queued_calls(); job.execute(); emscripten_main_thread_process_queued_calls(); } else job.execute(); } else { updater.thread_id = th_data.threadId; job.execute(); } //atomicOp!"-="(updater.jobs_count,1); //updater.trackers[job_id].end(); } } void dispatch(EntityManager.JobGroup group) { if(group.jobs.length == 0) { return; } jobs[group.id].name = cast(string)group.caller.system.name; foreach(ref job;group.jobs) { uint index = 0; if(job.callers.length)index = job.callers[0].system_id; JobCaller caller; caller.updater = &this; caller.job = &job; caller.id = index; jobs[group.id].add(caller); } jobs[group.id].build(&pool); uint deps = cast(uint)group.dependencies.length; foreach(dep;group.dependencies) { if(jobs[dep.id].count && dep.caller.system.willExecute && dep.caller.system.enabled)jobs[group.id].dependantOn(&jobs[dep.id]); else deps--; } if(deps == 0) { call_jobs.add(&jobs[group.id]); } last_job.dependantOn(&jobs[group.id]); } }