From d891be190c7de1dd9214d271c136f3f0252f5b24 Mon Sep 17 00:00:00 2001 From: Kyle McLamb Date: Wed, 19 Dec 2018 20:46:03 -0500 Subject: [PATCH] Split event serialization work across threads Fixes #6. This is an opt-in feature: to enable, call prof.enableThreadedWrite() at the start of your program. Then instead of saving each event on the main thread, and doing all the serialization work there, each event will be assigned to a pool of worker threads which will serialize each event in chunks at the end of the program. Potential improvements to this model could include: * Writing serialized data to a byte buffer instead of a string. This would save the cost of copying the chunk string between VMs, with the added complexity of handling ownership of the buffer and potentially having data that grows beyond the size of the buffer. * Incremental serialization. Right now the worker threads wait until the end of the program to start processing each event, but there's no reason they can't do that work ahead of time in the background if it doesn't affect the runtime of the main program (it might?) * Handling file I/O on a background thread, or even possibly in the worker threads themselves. Haven't thought about this one too much, because prof.write() is typically called at the end of the program where there's not much else going on --- jprof.lua | 83 +++++++++++++++++++++++++++++++++------ serializeWorkerThread.lua | 42 ++++++++++++++++++++ 2 files changed, 114 insertions(+), 11 deletions(-) create mode 100644 serializeWorkerThread.lua diff --git a/jprof.lua b/jprof.lua index 705c2cf..a874d6a 100644 --- a/jprof.lua +++ b/jprof.lua @@ -23,6 +23,7 @@ local profiler = {} -- since no allocations/deallocations are triggered by them anymore local zoneStack = {nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil} +local profDataNumEvents = 0 local profData = {} local netBuffer = nil local profEnabled = true @@ -31,6 +32,12 @@ local profEnabled = true -- to measure the jprof-less (i.e. "real") memory consumption local profMem = 0 +-- threaded write stuff +local isThreaded = false +local numThreads = nil +local chunkSize = nil +local eventChannels = {} + local function getByte(n, byte) return bit.rshift(bit.band(n, bit.lshift(0xff, 8*byte)), 8*byte) end @@ -39,8 +46,7 @@ end -- the file in chunks. If we attempt to pack a big table, the amount of memory -- used during packing can exceed the luajit memory limit pretty quickly, which will -- terminate the program before the file is written. -local function msgpackListIntoFile(list, file) - local n = #list +local function msgpackListIntoFile(n, file) -- https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family if n < 16 then file:write(string.char(144 + n)) @@ -51,15 +57,46 @@ local function msgpackListIntoFile(list, file) else error("List too big") end - for _, elem in ipairs(list) do - file:write(msgpack.pack(elem)) - end + + if isThreaded then + local DONE = true + for _, channel in ipairs(eventChannels) do + channel:supply(DONE) + end + + local channelIdx = 1 + -- iterate for each chunk, rounded up to account for the last + -- potentially incomplete chunk + local numChunks = math.ceil(n/chunkSize) + for _ = 1, numChunks do + local chunkStr = eventChannels[channelIdx]:demand() + file:write(chunkStr) + channelIdx = channelIdx % numThreads + 1 + end + else + for _, event in ipairs(profData) do + file:write(msgpack.pack(event)) + end + end end +local profDataNumEventsInChunk = 0 +local currentChannelIndex = 1 local function addEvent(name, memCount, annot) - local event = {name, love.timer.getTime(), memCount, annot} + local time = love.timer.getTime() + local event = {name, time, memCount, annot} if profData then - table.insert(profData, event) + profDataNumEvents = profDataNumEvents + 1 + if isThreaded then + eventChannels[currentChannelIndex]:push(event) + profDataNumEventsInChunk = profDataNumEventsInChunk + 1 + if profDataNumEventsInChunk == chunkSize then + currentChannelIndex = currentChannelIndex % numThreads + 1 + profDataNumEventsInChunk = 0 + end + else + profData[profDataNumEvents] = event + end end if netBuffer then table.insert(netBuffer, event) @@ -84,7 +121,11 @@ if PROF_CAPTURE then -- if the full profiling data is not saved to profData, then only netBuffer will increase the -- memory used by jprof and all of it will be freed for garbage collection at some point, so that -- we should probably not try to keep track of it at all - if profData then + -- + -- Ditto for threaded write support: All event storage is squirrelled + -- away into worker threads, so we don't actually increase the memory + -- toll on the "main" thread VM. + if profData and not isThreaded then profMem = profMem + (collectgarbage("count") - memCount) end end @@ -103,7 +144,7 @@ if PROF_CAPTURE then if profiler.socket and #zoneStack == 0 then profiler.netFlush() end - if profData then + if profData and not isThreaded then profMem = profMem + (collectgarbage("count") - memCount) end end @@ -114,17 +155,36 @@ if PROF_CAPTURE then end end + function profiler.enableThreadedWrite(_numThreads, _chunkSize) + assert(profData, "(jprof) profiling disabled (did you call prof.connect()?))") + assert(profDataNumEvents == 0, "(jprof) prof.enableThreadedWrite() should be called before creating profile events") + isThreaded = true + -- I have no evidence that this is the best number of threads, just that it seems ok on my machine + numThreads = _numThreads or love.system.getProcessorCount() * 2 + -- Ditto here, chunk size does not seem to have a huge effect on performance so long as it's not like, 1 + chunkSize = _chunkSize or 512 + for i=1, numThreads do + local channel = love.thread.newChannel() + table.insert(eventChannels, channel) + love.thread.newThread("serializeWorkerThread.lua"):start(channel, chunkSize) + end + end + function profiler.write(filename) assert(#zoneStack == 0, "(jprof) Zone stack is not empty") if not profData then print("(jprof) No profiling data saved (probably because you called prof.connect())") else + print(("(jprof) Saving %d profiled events..."):format(profDataNumEvents)) + local serializeTime = love.timer.getTime() local file, msg = love.filesystem.newFile(filename, "w") assert(file, msg) - msgpackListIntoFile(profData, file) + file:setBuffer('full') + msgpackListIntoFile(profDataNumEvents, file) file:close() - print(("(jprof) Saved profiling data to '%s'"):format(filename)) + serializeTime = (love.timer.getTime() - serializeTime) + print(("(jprof) Saved profiling data to '%s' (%f seconds)"):format(filename, serializeTime)) end end @@ -191,6 +251,7 @@ else profiler.push = noop profiler.pop = noop profiler.write = noop + profiler.enableThreadedWrite = noop profiler.enabled = noop profiler.connect = noop profiler.netFlush = noop diff --git a/serializeWorkerThread.lua b/serializeWorkerThread.lua new file mode 100644 index 0000000..cfb64ca --- /dev/null +++ b/serializeWorkerThread.lua @@ -0,0 +1,42 @@ +local eventChannel, chunkSize = ... + +love.filesystem = require 'love.filesystem' +local msgpack = require("MessagePack") +msgpack.set_number("double") + +local eventList = {} + +-- record events +local complete = false +while not complete do + local event = eventChannel:demand() + if event == true then + complete = true + else + table.insert(eventList, event) + end +end + +-- serialize events +local buf = {} +local function pushBuf() + local str = table.concat(buf) + eventChannel:push(str) + for i=#buf, 1, -1 do + buf[i] = nil + end +end + +for _, event in ipairs(eventList) do + local str = msgpack.pack(event) + table.insert(buf, str) + if #buf == chunkSize then + pushBuf() + end +end + +if #buf ~= 0 then + -- push final incomplete chunk + -- there should only be one worker that actually runs this + pushBuf() +end