folly::Future 简介与使用

#什么是 folly::Future?

folly::Future 是 Meta 开源的 C++ 库 Folly 中提供的一个异步编程工具。它类似于其他语言中的 Future 或 Promise 概念,用于表示一个可能在未来某个时间点完成的异步操作的结果。通过使用 folly::Future,开发者可以更方便地处理异步任务,避免回调地狱,提高代码的可读性和维护性。

folly::Future 相对于标准库中的 std::future,提供了更多的功能和更灵活的接口,例如链式调用、错误处理等。

#基本用法

#1. 创建 Future

#通过 Promise 创建 Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
folly::Future<int> asyncOperation() {
folly::Promise<int> promise;
auto future = promise.getFuture();

// 模拟异步操作
std::thread([p = std::move(promise)]() mutable {
std::this_thread::sleep_for(std::chrono::seconds(1));
if (rand() % 2 == 0) {
p.setValue(42); // 设置结果
} else {
p.setException(std::domain_error("Simulated error")); // 设置异常
}
}).detach();

return future;
}

#创建已完成的 Future

1
2
3
4
5
6
folly::Future<int> completedFuture() {
return folly::makeFuture(100); // 立即返回一个已完成的 Future
}
folly::Future<int> failedFuture() {
return folly::makeFuture<int>(std::runtime_error("Error occurred")); // 返回一个失败的 Future
}

#2. 等待 Future

#阻塞等待 Future 完成并获取结果

1
2
3
4
5
6
7
8
9
void waitForFutureResult() {
auto fut = asyncOperation();
try {
int result = std::move(fut).get(); // 阻塞等待结果
std::cout << "Future completed with value: " << result << std::endl;
} catch (const std::exception& e) {
std::cerr << "Future failed with error: " << e.what() << std::endl;
}
}

#阻塞等待 Future 完成

1
2
3
4
5
6
7
8
9
void waitForFutureFinish() {
auto fut = asyncOperation();
try {
std::move(fut).wait(); // 阻塞等待完成
std::cout << "Future completed" << std::endl;
} catch (const std::exception& e) {
std::cerr << "Future failed with error: " << e.what() << std::endl;
}
}

#3. 回调函数

#在 Future 上追加回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
folly::Future<int> registerCallback() {
auto fut = asyncOperation();

auto fut2 = std::move(fut)
.thenValue([](int value) {
std::cout << "Future completed with value: " << value << std::endl;
return value * 2;
})
.thenError<std::domain_error>([](std::domain_error &&e) {
std::cerr << "Future failed with domain_error: " << e.what() << std::endl;
return -1;
})
.thenError<std::exception>([](std::exception &&e) {
std::cerr << "Future failed with exception: " << e.what() << std::endl;
return -1;
});

return fut2;
}

追加多个回调函数,实现链式调用

1
2
3
4
5
6
7
8
9
10
11
12
13
void chainFutures() {
asyncOperation()
.thenValue([](int value) {
return value * 2; // 返回新的值
})
.thenValue([](int value) {
return std::to_string(value); // 转换为字符串
})
.thenValue([](const std::string &str) { //
std::cout << "Final result: " << str << std::endl;
})
.wait();
}

#控制回调函数的执行线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void futureWithExecutor() {
auto main_executor = folly::getGlobalCPUExecutor().get();
folly::CPUThreadPoolExecutor pool_executor(4);

asyncOperation()
.via(main_executor) // 指定后续callback在默认执行器上运行
.thenValue([](int value) {
std::cout << "Callback running on main_executor thread, value: " << value << std::endl;
return value + 1;
})
.via(&pool_executor) // 指定后续callback在pool_executor上运行
.thenValue([](int value) {
std::cout << "Callback running on pool_executor thread, value: " << value << std::endl;
return value + 1;
})
.wait();
}

#4. 组合多个 Future

#等待多个 Future 完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void combineFutures() {
auto fut1 = asyncOperation();
auto fut2 = completedFuture();
auto fut3 = failedFuture();

auto combinedFuture =
folly::collectAll(std::array<folly::Future<int>, 3>{std::move(fut1), std::move(fut2), std::move(fut3)})
.via(folly::getGlobalCPUExecutor().get())
.thenValue([](auto &&results) {
int sum = 0;
for (folly::Try<int> &result : results) {
if (result.hasValue()) {
sum += result.value();
}
if (result.hasException()) {
std::cerr << "One of the futures failed: " << result.exception().what() << std::endl;
}
}
return sum;
})
.thenValue([](int sum) {
std::cout << "Sum of successful futures: " << sum << std::endl;
return sum;
});

combinedFuture.wait();
}

#等待第一个完成的 Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void raceFutures() {
auto fut1 = asyncOperation();
auto fut2 = completedFuture();
auto fut3 = failedFuture();

auto raceFuture =
folly::collectAny(std::array<folly::Future<int>, 3>{std::move(fut1), std::move(fut2), std::move(fut3)})
.via(folly::getGlobalCPUExecutor().get())
.thenValue([](std::pair<size_t, folly::Try<int>> result) {
if (result.second.hasValue()) {
std::cout << "First completed future value: " << result.second.value() << std::endl;
} else if (result.second.hasException()) {
std::cerr << "First completed future failed: " << result.second.exception().what() << std::endl;
}
return result;
});
raceFuture.wait();
// First completed future value: 100

auto fut4 = failedFuture();
auto raceFuture2 = folly::collectAny(std::array<folly::Future<int>, 1>{std::move(fut4)})
.via(folly::getGlobalCPUExecutor().get())
.thenValue([](std::pair<size_t, folly::Try<int>> result) {
if (result.second.hasValue()) {
std::cout << "First completed future value: " << result.second.value() << std::endl;
} else if (result.second.hasException()) {
std::cerr << "First completed future failed: " << result.second.exception().what()
<< std::endl;
}
return result;
});
raceFuture2.wait();
// First completed future failed: std::runtime_error: Error occurred
}

#收集成功的 Future 结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void collectSuccessfulFutures() {
auto fut1 = asyncOperation();
auto fut2 = completedFuture();
auto fut3 = failedFuture();

auto collectedFuture =
folly::collect(std::array<folly::Future<int>, 3>{std::move(fut1), std::move(fut2), std::move(fut3)})
.via(folly::getGlobalCPUExecutor().get())
.thenValue([](std::vector<int> results) {
int sum = 0;
for (int value : results) {
sum += value;
}
std::cout << "Sum of successful futures: " << sum << std::endl;
return sum;
});

collectedFuture.wait();
}

#5. 复杂控制流

#循环重试直到成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void retryUntilSuccess(std::function<folly::Future<int>()> operation, int maxRetries) {
std::function<folly::Future<int>(int)> attempt = [&](int retriesLeft) -> folly::Future<int> {
return operation().thenError<std::exception>([&](std::exception &&e) {
if (retriesLeft > 0) {
std::cout << "Operation failed: " << e.what() << ", retries left: " << retriesLeft << std::endl;
return attempt(retriesLeft - 1);
} else {
std::cerr << "Operation failed after max retries: " << e.what() << std::endl;
throw; // 重新抛出异常
}
});
};

attempt(maxRetries).wait();
}

#条件循环执行异步操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void useWhileDo() {
using namespace std::chrono_literals;

const auto start = std::chrono::steady_clock::now();
std::atomic<bool> done{false};

folly::whileDo([&]() { return !done && std::chrono::steady_clock::now() - start < 5s; },
[&]() {
return asyncOperation()
.thenValue([&](int value) {
done = true;
std::cout << "Success value: " << value << std::endl
<< "Elapsed time: "
<< std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start)
.count()
<< " seconds" << std::endl;
})
.thenError<std::exception>([&](std::exception &&e) {
std::cerr << "Operation failed: " << e.what() << std::endl;
return folly::futures::sleep(500ms);
});
})
.then([&](auto &&) {
if (!done) {
std::cout << "Operation did not succeed within timeout 5 seconds." << std::endl;
}
})
.wait();
}