Rust: работа с потоками


Жил я себе поживал раньше без забот и без хлопот в однопоточной счастливой стране JavaScript, где имел дело с потоками разве что при взаимодействии между веб-сайтом и расширением для Chrome. Поэтому, когда кто-то заводил разговор о трудностях параллелизма и конкурентности, я никогда по-настоящему не понимал, из-за чего весь сыр-бор.

Я начал изучать Rust несколько недель назад, переписывая текстовую игру, которую до этого сделал с помощью Vue. Это игра на выживание, в которой нужно собирать и изготавливать предметы, чтобы есть и пить. Условие победы здесь одно — постараться продержаться как можно большее количество дней. Мне удалось привести в рабочее состояние большинство игровых функций, но была досадная ошибка: если пользователь уходил на несколько часов из игры, нельзя было проверить статистику, пока он не вернётся. А иногда проходило по нескольку месяцев без изменений!

Я знал, что эту проблему можно решить с помощью потоков, поэтому наконец набрался смелости и прочитал главу «Многопоточность без страха» из «Языка программирования Rust» от авторов Steve Klabnik, Carol Nichols и участников сообщества Rust.

Итак, мне нужно было получить возможность отслеживать статистику и вести подсчёт дней каждые несколько секунд, а также уведомлять игрока, как только статистика достигнет 0. Создать новый поток, который запускает какой-то код каждые 10 секунд, было легко:

thread::spawn(move || loop { thread::sleep(Duration::from_secs(10)); println!("Now we should decrease stats and update day count…"); });

Но как изменить статистику и подсчёт дней из этого потока, избежав проблем с владением?

Всё оказалось намного проще, чем я ожидал, ведь можно создать мьютекс (взаимное исключение), чтобы в каждый конкретный момент времени только один поток мог получить доступ к этим данным. Завладеть этим мьютексом будет стремиться несколько потоков, поэтому нужно обернуть его в Arc (автоматический подсчёт ссылок), чтобы код работал правильно (всё это гораздо лучше объяснено в главе «Многопоточное разделяемое состояние»). Код в конечном итоге выглядит так:

fn main() { let stats = Arc::new(Mutex::new(Stats { water: Stat::new(100.0), food: Stat::new(100.0), energy: Stat::new(100.0), })); let days = Arc::new(Mutex::new(0)); control_time(&days, &stats); // ... } fn control_time(days: &Arc<Mutex<i32>>, stats: &Arc<Mutex<Stats>>) { let now = Instant::now(); let days = Arc::clone(&days); let stats = Arc::clone(&stats); thread::spawn(move || loop { thread::sleep(Duration::from_secs(10)); let elapsed_time = now.elapsed().as_secs(); let mut elapsed_days = days.lock().unwrap(); *elapsed_days = elapsed_time as i32 / 60; let mut stats_lock = stats.lock().unwrap(); decrease_stats(&mut stats_lock, 10.0); }); }

В основном потоке можем оставить days (дни) и stats (статистику) и просто добавить .lock().

И всё работает хорошо… Но проблема, похоже, осталась: основной поток занят в ожидании данных от пользователя! Несмотря на то, что статистика и подсчёт дней успешно обновляются каждые 10 секунд, основной поток ни сном ни духом об этом не ведает. Пришло время добавить ещё один поток!

Этот поток должен обработать данные от пользователя и отправить действие в основной поток. Для этого лучше использовать другой способ передачи данных между потоками, который изложен в соседней главе «Языка программирования Rust» о передаче сообщений.

Для отправки действия в основной поток мне нужен был канал, а основной поток должен был дать знать новому потоку с данными от пользователя, когда он готов принять действие (так как на некоторые действия нужно время). Каналы, которые Rust предлагает в стандартной библиотеке, идут по типу несколько производителей — один потребитель. Это означает, что между каналами двустороннее взаимодействие отсутствует, поэтому в итоге я создал два канала:

let (tx, rx) = mpsc::channel(); let (tx2, rx2) = mpsc::channel();

Я особо не заморачивался с именами.

Теперь создаём новый поток, который будет ждать сигнала от основного потока, запросит данные от пользователя и отправит их обратно в основной поток. Обратите внимание, что rx2.recv() блокирует поток до тех пор, пока не будет получено сообщение: это позволит нам контролировать, когда отправить запрос данных пользователю.

thread::spawn(move || loop { let _ = rx2.recv(); let action = request_input("\nWhat to do?"); tx.send(action).ok(); });

Затем из основного потока отправляем сообщение для запроса входных данных и переходим к созданию цикла, который будет постоянно проверять статистику и данные от пользователя с помощью rx.try_recv() (это не блокирует поток). Если статистика достигает 0, цикл обрывается, завершая игру. Если не достигает 0, снова запрашиваем входные данные.

tx2.send(String::from("Ready for input")).ok(); loop { if let Ok(action) = rx.try_recv() { match action.trim() { // обрабатываются все возможные действия } } if is_game_over(&stats.lock().unwrap()) { break; } else { tx2.send(String::from("Ready for input")).ok(); } }

Для меня это было совершенно естественно: всё равно что отправить событие в JavaScript! Но вообще-то не совсем.

Когда вы отправляете событие в JavaScript, никому нет дела, слушает кто-то это событие или нет. Вы отправляете его, и это сообщение навсегда теряется, если нет слушателя.

В стране Rust, если в лесу падает дерево, должен быть кто-то, кто это услышит. В противном случае лес запаникует и сожжёт сам себя, а мир взорвётся. А если этот кто-то занят другими делами, все деревья будут ждать в очереди и не упадут, пока этот кто-то не начнет их слышать. Вот что у нас происходило:

Входной поток не ждёт сообщения о готовности ready от основного потока. Однако проблема в том, что основной поток постоянно отправляет сообщения. Не забывайте, что действие action прослушивается через try_recv, поэтому оно не блокируется. Даже в том случае, когда пользователь вводит sleep, основной поток реально спит в течение нескольких секунд из-за того, что мы уже завалили его сообщениями, и входной поток получает сообщения одно за другим. Для тех, кто привык к другим языкам, это может показаться естественным, но не пришельцам из JavaScript: это взорвало мне мозг и долгое время не укладывалось в голове.

В итоге я нашёл простое решение: отправлять сообщение о готовности ready только после того, как получено и обработано последнее сообщение:

tx2.send(String::from("Ready for input")).ok(); loop { if let Ok(action) = rx.try_recv() { match action.trim() { // обрабатываются все возможные действия } // теперь мы готовы к другому действию: tx2.send(String::from("Ready for input")).ok(); } if is_game_over(&stats.lock().unwrap()) { break; } }

Вот и всё, все проблемы решены! 

За те несколько недель, что я изучаю Rust, трудным мне показался не сам язык — трудно было оставить прежний образ мыслей JavaScript-разработчика (смелее переходите с JavaScript на выбранный вами язык). Именно это мне больше всего и нравится — выходить из зоны комфорта! Загляните в код игры здесь: https://github.com/codegram/live-rust


Перевод статьи Núria: Learning Rust: Working with threads


Поделиться статьей:


Вернуться к статьям

Комментарии

    Ничего не найдено.