[ PROMPT_NODE_24595 ]
Concurrency
[ SKILL_DOCUMENTATION ]
# Concurrency and Parallel Programming
## Atomics and Memory Ordering
```cpp
#include
#include
// Basic atomics
std::atomic counter{0};
std::atomic flag{false};
// Memory ordering
void producer(std::atomic& data, std::atomic& ready) {
data.store(42, std::memory_order_relaxed);
ready.store(true, std::memory_order_release); // Release barrier
}
void consumer(std::atomic& data, std::atomic& ready) {
while (!ready.load(std::memory_order_acquire)) { // Acquire barrier
std::this_thread::yield();
}
int value = data.load(std::memory_order_relaxed);
}
// Compare-and-swap
bool try_acquire_lock(std::atomic& lock) {
bool expected = false;
return lock.compare_exchange_strong(expected, true,
std::memory_order_acquire,
std::memory_order_relaxed);
}
// Fetch-and-add
int increment_counter(std::atomic& counter) {
return counter.fetch_add(1, std::memory_order_relaxed);
}
```
## Lock-Free Data Structures
```cpp
#include
#include
// Lock-free stack
template
class LockFreeStack {
struct Node {
T data;
Node* next;
Node(const T& value) : data(value), next(nullptr) {}
};
std::atomic head_{nullptr};
public:
void push(const T& value) {
Node* new_node = new Node(value);
new_node->next = head_.load(std::memory_order_relaxed);
while (!head_.compare_exchange_weak(new_node->next, new_node,
std::memory_order_release,
std::memory_order_relaxed)) {
// Retry with updated head
}
}
bool pop(T& result) {
Node* old_head = head_.load(std::memory_order_relaxed);
while (old_head &&
!head_.compare_exchange_weak(old_head, old_head->next,
std::memory_order_acquire,
std::memory_order_relaxed)) {
// Retry
}
if (old_head) {
result = old_head->data;
delete old_head; // Note: ABA problem exists
return true;
}
return false;
}
};
// Lock-free queue (single producer, single consumer)
template
class SPSCQueue {
std::array buffer_;
alignas(64) std::atomic head_{0};
alignas(64) std::atomic tail_{0};
public:
bool push(const T& item) {
size_t head = head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % Size;
if (next_head == tail_.load(std::memory_order_acquire)) {
return false; // Queue full
}
buffer_[head] = item;
head_.store(next_head, std::memory_order_release);
return true;
}
bool pop(T& item) {
size_t tail = tail_.load(std::memory_order_relaxed);
if (tail == head_.load(std::memory_order_acquire)) {
return false; // Queue empty
}
item = buffer_[tail];
tail_.store((tail + 1) % Size, std::memory_order_release);
return true;
}
};
```
## Thread Pool
```cpp
#include
#include
#include
#include
#include
#include
class ThreadPool {
std::vector workers_;
std::queue<std::function> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_ = false;
public:
ThreadPool(size_t num_threads) {
for (size_t i = 0; i < num_threads; ++i) {
workers_.emplace_back([this] {
while (true) {
std::function task;
{
std::unique_lock lock(queue_mutex_);
condition_.wait(lock, [this] {
return stop_ || !tasks_.empty();
});
if (stop_ && tasks_.empty()) {
return;
}
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (auto& worker : workers_) {
worker.join();
}
}
template
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::invoke_result_t> {
using return_type = typename std::invoke_result_t;
auto task = std::make_shared<std::packaged_task>(
std::bind(std::forward(f), std::forward(args)...)
);
std::future result = task->get_future();
{
std::unique_lock lock(queue_mutex_);
if (stop_) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks_.emplace([task]() { (*task)(); });
}
condition_.notify_one();
return result;
}
};
```
## Parallel STL Algorithms
```cpp
#include
#include
#include
#include
void parallel_algorithms_demo() {
std::vector vec(1'000'000);
std::iota(vec.begin(), vec.end(), 0);
// Parallel sort
std::sort(std::execution::par, vec.begin(), vec.end());
// Parallel for_each
std::for_each(std::execution::par_unseq, vec.begin(), vec.end(),
[](int& x) { x *= 2; });
// Parallel transform
std::vector result(vec.size());
std::transform(std::execution::par, vec.begin(), vec.end(),
result.begin(), [](int x) { return x * x; });
// Parallel reduce
int sum = std::reduce(std::execution::par, vec.begin(), vec.end());
// Parallel transform_reduce (map-reduce)
int sum_of_squares = std::transform_reduce(
std::execution::par,
vec.begin(), vec.end(),
0,
std::plus(),
[](int x) { return x * x; }
);
}
```
## Synchronization Primitives
```cpp
#include
#include
#include
// Mutex types
std::mutex mtx;
std::recursive_mutex rec_mtx;
std::timed_mutex timed_mtx;
std::shared_mutex shared_mtx;
// RAII locks
void exclusive_access() {
std::lock_guard lock(mtx);
// Critical section
}
void unique_lock_example() {
std::unique_lock lock(mtx);
// Can unlock and relock
lock.unlock();
// Do some work
lock.lock();
}
// Reader-writer lock
class SharedData {
mutable std::shared_mutex mutex_;
std::string data_;
public:
std::string read() const {
std::shared_lock lock(mutex_);
return data_;
}
void write(std::string new_data) {
std::unique_lock lock(mutex_);
data_ = std::move(new_data);
}
};
// Condition variable
class Queue {
std::queue queue_;
std::mutex mutex_;
std::condition_variable cv_;
public:
void push(int value) {
{
std::lock_guard lock(mutex_);
queue_.push(value);
}
cv_.notify_one();
}
int pop() {
std::unique_lock lock(mutex_);
cv_.wait(lock, [this] { return !queue_.empty(); });
int value = queue_.front();
queue_.pop();
return value;
}
};
// std::scoped_lock - multiple mutexes
std::mutex mtx1, mtx2;
void transfer(Account& from, Account& to, int amount) {
std::scoped_lock lock(from.mutex, to.mutex); // Deadlock-free
from.balance -= amount;
to.balance += amount;
}
```
## Async and Futures
```cpp
#include
// std::async
auto future = std::async(std::launch::async, []() {
return expensive_computation();
});
// Get result (blocks until ready)
auto result = future.get();
// Promise and future
void producer(std::promise promise) {
int value = compute_value();
promise.set_value(value);
}
void consumer(std::future future) {
int value = future.get();
}
std::promise promise;
std::future future = promise.get_future();
std::thread producer_thread(producer, std::move(promise));
std::thread consumer_thread(consumer, std::move(future));
// Packaged task
std::packaged_task task([](int a, int b) {
return a + b;
});
std::future task_future = task.get_future();
std::thread task_thread(std::move(task), 5, 3);
int sum = task_future.get(); // 8
task_thread.join();
```
## Coroutine-Based Concurrency
```cpp
#include
#include
// Async task coroutine
template
struct AsyncTask {
struct promise_type {
std::optional value;
std::exception_ptr exception;
AsyncTask get_return_object() {
return AsyncTask{
std::coroutine_handle::from_promise(*this)
};
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_value(T v) {
value = std::move(v);
}
void unhandled_exception() {
exception = std::current_exception();
}
};
std::coroutine_handle handle;
AsyncTask(std::coroutine_handle h) : handle(h) {}
~AsyncTask() { if (handle) handle.destroy(); }
T get() {
if (!handle.done()) {
handle.resume();
}
if (handle.promise().exception) {
std::rethrow_exception(handle.promise().exception);
}
return *handle.promise().value;
}
};
// Usage
AsyncTask async_compute() {
co_return 42;
}
```
## Quick Reference
| Primitive | Use Case | Performance |
|-----------|----------|-------------|
| std::atomic | Simple shared state | Lock-free |
| std::mutex | Exclusive access | Kernel call |
| std::shared_mutex | Read-heavy workload | Better than mutex |
| Lock-free structures | High contention | Best throughput |
| Thread pool | Task parallelism | Avoid thread overhead |
| Parallel STL | Data parallelism | Automatic scaling |
| std::async | Simple async tasks | Thread pool |
| Coroutines | Async I/O | Minimal overhead |
## Memory Ordering Guide
| Ordering | Guarantees | Use Case |
|----------|-----------|----------|
| relaxed | No synchronization | Counters |
| acquire | Load barrier | Consumer |
| release | Store barrier | Producer |
| acq_rel | Both | RMW operations |
| seq_cst | Total order | Default |
Source: claude-code-templates (MIT). See About Us for full credits.