Skip to content
Snippets Groups Projects
Commit e646209d authored by Roman Gershman's avatar Roman Gershman
Browse files

Stability fixes to PUBSUB code

parent 746c04fb
No related branches found
Tags v0.2.0
No related merge requests found
......@@ -64,7 +64,11 @@ jobs:
update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-9 40 \
--slave /usr/bin/g++ g++ /usr/bin/g++-9
run: |
# Work around https://github.com/actions/checkout/issues/766
git config --global --add safe.directory /src
cd /src
git describe --always --tags ${{ github.sha }}
if [ -d build-opt ]; then
chown -R root build-opt
ls -l ./build-opt
......@@ -96,6 +100,12 @@ jobs:
submodules: true
- name: Build artifacts
run: |
# Work around https://github.com/actions/checkout/issues/766
git config --global --add safe.directory "$GITHUB_WORKSPACE"
git describe --always --tags ${{ github.sha }}
VERSION=$(git describe --always --tags ${{ github.sha }})
echo "::set-output name=version::${VERSION}"
./tools/release.sh
- name: Upload
uses: actions/upload-artifact@v3
......
......@@ -31,7 +31,9 @@ if(ENABLE_GIT_VERSION)
else()
set(GIT_CLEAN_DIRTY "")
endif()
Message(STATUS "GIT_SHA1 ${GIT_SHA1}")
git_describe(GIT_VER --always)
Message(STATUS "GIT_VER ${GIT_VER}")
string(TIMESTAMP PRJ_BUILD_TIME "%Y-%m-%d %H:%M:%S" UTC)
else(ENABLE_GIT_VERSION)
set(GIT_VER "dev")
......
......@@ -8,6 +8,8 @@ extern "C" {
#include "redis/util.h"
}
#include "base/logging.h"
namespace dfly {
using namespace std;
......@@ -72,7 +74,10 @@ auto ChannelSlice::FetchSubscribers(string_view channel) -> vector<Subscriber> {
void ChannelSlice::CopySubscribers(const SubscribeMap& src, const std::string& pattern,
vector<Subscriber>* dest) {
for (const auto& sub : src) {
Subscriber s(sub.first, sub.second.thread_id);
ConnectionContext* cntx = sub.first;
CHECK(cntx->conn_state.subscribe_info);
Subscriber s(cntx, sub.second.thread_id);
s.pattern = pattern;
s.borrow_token.Inc();
......
......@@ -46,11 +46,6 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis
}
}
if (!to_add && conn_state.subscribe_info->IsEmpty()) {
conn_state.subscribe_info.reset();
force_dispatch = false;
}
sort(channels.begin(), channels.end());
// prepare the array in order to distribute the updates to the shards.
......@@ -90,6 +85,12 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis
// Update subscription
shard_set->RunBriefInParallel(move(cb),
[&](ShardId sid) { return shard_idx[sid + 1] > shard_idx[sid]; });
// It's important to reset
if (!to_add && conn_state.subscribe_info->IsEmpty()) {
conn_state.subscribe_info.reset();
force_dispatch = false;
}
}
if (to_reply) {
......@@ -139,15 +140,10 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args)
}
}
if (!to_add && conn_state.subscribe_info->IsEmpty()) {
conn_state.subscribe_info.reset();
force_dispatch = false;
}
int32_t tid = util::ProactorBase::GetIndex();
DCHECK_GE(tid, 0);
// Update the subscribers on publisher's side.
// Update the subscribers on channel-slice side.
auto cb = [&](EngineShard* shard) {
ChannelSlice& cs = shard->channel_slice();
for (string_view pattern : patterns) {
......@@ -161,6 +157,13 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args)
// Update pattern subscription. Run on all shards.
shard_set->RunBriefInParallel(move(cb));
// Important to reset conn_state.subscribe_info only after all references to it were
// removed from channel slices.
if (!to_add && conn_state.subscribe_info->IsEmpty()) {
conn_state.subscribe_info.reset();
force_dispatch = false;
}
}
if (to_reply) {
......@@ -209,7 +212,8 @@ void ConnectionContext::SendSubscriptionChangedResponse(string_view action,
}
void ConnectionContext::OnClose() {
if (!conn_state.subscribe_info) return;
if (!conn_state.subscribe_info)
return;
if (!conn_state.subscribe_info->channels.empty()) {
auto token = conn_state.subscribe_info->borrow_token;
......
......@@ -301,7 +301,7 @@ TEST_F(DflyEngineTest, Hello) {
auto resp = Run({"hello", "2"});
ASSERT_THAT(resp, ArrLen(12));
EXPECT_THAT(resp.GetVec(),
ElementsAre("server", "redis", "version", "df-dev", "proto",
ElementsAre("server", "redis", "version", ArgType(RespExpr::STRING), "proto",
IntArg(2), "id", ArgType(RespExpr::INT64), "mode",
"standalone", "role", "master"));
......
......@@ -897,6 +897,9 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&] { return EngineShard::tlocal()->channel_slice().FetchSubscribers(channel); };
// How do we know that subsribers did not disappear after we fetched them?
// Each subscriber object hold a borrow_token.
// ConnectionContext::OnClose does not reset subscribe_info before all tokens are returned.
vector<ChannelSlice::Subscriber> subscriber_arr = shard_set->Await(sid, std::move(cb));
atomic_uint32_t published{0};
......@@ -912,6 +915,8 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
}
fibers_ext::BlockingCounter bc(subscriber_arr.size());
// We run publish_cb in each subsriber's thread.
auto publish_cb = [&, bc](unsigned idx, util::ProactorBase*) mutable {
unsigned start = slices[idx];
......@@ -921,6 +926,7 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
break;
published.fetch_add(1, memory_order_relaxed);
facade::Connection* conn = subscriber_arr[i].conn_cntx->owner();
DCHECK(conn);
facade::Connection::PubMessage pmsg;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment